in holo-e2e-performance-tool/src/main/java/com/alibaba/hologres/performace/client/FixedCopyTest.java [68:156]
public void run() {
HoloConfig poolConf = new HoloConfig();
HoloConfig clientConf = new HoloConfig();
try {
ConfLoader.load(confName, "holoClient.", poolConf);
ConfLoader.load(confName, "holoClient.", clientConf);
ConfLoader.load(confName, "pool.", poolConf);
Meter meter = Metrics.registry().meter(Metrics.METRICS_WRITE_RPS);
Meter bpsMeter = Metrics.registry().meter(Metrics.METRICS_WRITE_BPS);
Properties props = new Properties();
PGProperty.USER.set(props, poolConf.getUsername());
PGProperty.PASSWORD.set(props, poolConf.getPassword());
PGProperty.APPLICATION_NAME.set(props, poolConf.getAppName());
LOG.info("props : {}", props);
String jdbcUrl = poolConf.getJdbcUrl();
if (fixedCopyConf.enableClientLoadBalance && feNumber > 0) {
int curFeId = (this.id + randomOffset) % feNumber + 1;
if (jdbcUrl.indexOf("options") != -1) {
throw new RuntimeException("holoClient.jdbcUrl not allowed to contain options when fixedCopy.enableClientLoadBalance is true");
}
jdbcUrl += ("?options=fe=" + curFeId);
LOG.info("will connect to fe id {} with url {}", curFeId, jdbcUrl);
}
try (Connection conn = DriverManager.getConnection(jdbcUrl, props)) {
Random rand = new Random();
TableSchema schema =
ConnectionUtil.getTableSchema(conn, TableName.valueOf(conf.tableName));
SqlUtil.disableAffectedRows(conn);
SqlUtil.setOnSessionThreadSize(conn, fixedCopyConf.onSessionThreadPoolSize);
String copySql = null;
List<String> columns = Util.getWriteColumnsName(conf, schema);
if (conf.writeColumnCount > 0) {
copySql = CopyUtil.buildCopyInSql(schema.getTableName(), columns, true,
schema.getPrimaryKeys().length > 0, poolConf.getWriteMode(), true);
} else {
copySql = CopyUtil.buildCopyInSql(schema, true, poolConf.getWriteMode());
}
int i = 0;
BaseConnection baseConn = conn.unwrap(BaseConnection.class);
CopyManager copyManager = baseConn.getCopyAPI();
LOG.info("start copy: {}", copySql);
CopyInOutputStream cos = new CopyInOutputStream(copyManager.copyIn(copySql));
try (RecordBinaryOutputStream os = new RecordBinaryOutputStream(cos, schema, baseConn,
fixedCopyConf.maxRowBufferSize)) {
while (true) {
long pk = tic.incrementAndGet();
++i;
if(conf.testByTime) {
if (i % 1000 == 0) {
if (System.currentTimeMillis() > targetTime) {
totalCount.addAndGet(i-1);
LOG.info("test time reached");
break;
}
}
} else {
if (pk > conf.rowNumber) {
totalCount.addAndGet(i-1);
break;
}
}
Record record = new Record(schema);
fillRecord(record, pk, schema, rand, columns);
if (fixedCopyConf.checkRecordBeforeInsert) {
RecordChecker.check(record);
}
os.putRecord(record);
meter.mark();
bpsMeter.mark(record.getByteSize());
}
}
LOG.info("copy write : {}", cos.getResult());
}
} catch (Exception e) {
LOG.error("", e);
} finally {
if (conf.dumpMemoryStat) {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}