public void run()

in holo-e2e-performance-tool/src/main/java/com/alibaba/hologres/performace/client/FixedCopyTest.java [68:156]


    public void run() {
      HoloConfig poolConf = new HoloConfig();
      HoloConfig clientConf = new HoloConfig();
      try {
        ConfLoader.load(confName, "holoClient.", poolConf);
        ConfLoader.load(confName, "holoClient.", clientConf);
        ConfLoader.load(confName, "pool.", poolConf);

        Meter meter = Metrics.registry().meter(Metrics.METRICS_WRITE_RPS);
        Meter bpsMeter = Metrics.registry().meter(Metrics.METRICS_WRITE_BPS);
        Properties props = new Properties();
        PGProperty.USER.set(props, poolConf.getUsername());
        PGProperty.PASSWORD.set(props, poolConf.getPassword());
        PGProperty.APPLICATION_NAME.set(props, poolConf.getAppName());
        LOG.info("props : {}", props);
        String jdbcUrl = poolConf.getJdbcUrl();
        if (fixedCopyConf.enableClientLoadBalance && feNumber > 0) {
          int curFeId = (this.id + randomOffset) % feNumber + 1;
          if (jdbcUrl.indexOf("options") != -1) {
            throw new RuntimeException("holoClient.jdbcUrl not allowed to contain options when fixedCopy.enableClientLoadBalance is true");
          }
          jdbcUrl += ("?options=fe=" + curFeId);
          LOG.info("will connect to fe id {} with url {}", curFeId, jdbcUrl);
        }

        try (Connection conn = DriverManager.getConnection(jdbcUrl, props)) {

          Random rand = new Random();
          TableSchema schema =
              ConnectionUtil.getTableSchema(conn, TableName.valueOf(conf.tableName));
          SqlUtil.disableAffectedRows(conn);
          SqlUtil.setOnSessionThreadSize(conn, fixedCopyConf.onSessionThreadPoolSize);
          String copySql = null;
          List<String> columns = Util.getWriteColumnsName(conf, schema);
          if (conf.writeColumnCount > 0) {
            copySql = CopyUtil.buildCopyInSql(schema.getTableName(), columns, true,
                schema.getPrimaryKeys().length > 0, poolConf.getWriteMode(), true);
          } else {
            copySql = CopyUtil.buildCopyInSql(schema, true, poolConf.getWriteMode());
          }
          int i = 0;
          BaseConnection baseConn = conn.unwrap(BaseConnection.class);
          CopyManager copyManager = baseConn.getCopyAPI();
          LOG.info("start copy: {}", copySql);
          CopyInOutputStream cos = new CopyInOutputStream(copyManager.copyIn(copySql));

          try (RecordBinaryOutputStream os = new RecordBinaryOutputStream(cos, schema, baseConn,
              fixedCopyConf.maxRowBufferSize)) {
            while (true) {
              long pk = tic.incrementAndGet();
              ++i;
              if(conf.testByTime) {
                if (i % 1000 == 0) {
                  if (System.currentTimeMillis() > targetTime) {
                    totalCount.addAndGet(i-1);
                    LOG.info("test time reached");
                    break;
                  }
                }
              } else {
                if (pk > conf.rowNumber) {
                  totalCount.addAndGet(i-1);
                  break;
                }
              }
              Record record = new Record(schema);
              fillRecord(record, pk, schema, rand, columns);
              if (fixedCopyConf.checkRecordBeforeInsert) {
                RecordChecker.check(record);
              }
              os.putRecord(record);
              meter.mark();
              bpsMeter.mark(record.getByteSize());
            }
          }
          LOG.info("copy write : {}", cos.getResult());
        }
      } catch (Exception e) {
        LOG.error("", e);
      } finally {
        if (conf.dumpMemoryStat) {
          try {
            barrier.await();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    }