public void start()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [183:259]


  public void start(Map<String, String> map) {
    LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter START");

    startTimestamp = System.currentTimeMillis();

    config = new MaxComputeSinkConnectorConfig(map);
    accountType =
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.ACCOUNT_TYPE.getName());
    timeout =
        config.getLong(MaxComputeSinkConnectorConfig.BaseParameter.CLIENT_TIMEOUT_MS.getName());
    bufferSizeKB =
        config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.BUFFER_SIZE_KB.getName());
    retryTimes =
        config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.FAIL_RETRY_TIMES.getName());

    String
        endpoint =
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_ENDPOINT.getName());
    project =
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_PROJECT.getName());
    table =
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MAXCOMPUTE_TABLE.getName());
    tunnelEndpoint =
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.TUNNEL_ENDPOINT.getName());
    batchSize =
        config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.RECORD_BATCH_SIZE.getName());
    Integer
        poolSize =
        config.getInt(MaxComputeSinkConnectorConfig.BaseParameter.POOL_SIZE.getName());
    skipErrorRecords =
        config.getBoolean(MaxComputeSinkConnectorConfig.BaseParameter.SKIP_ERROR.getName());
    if (poolSize > 1) {
      executor = Executors.newFixedThreadPool(poolSize); // multi-thread to run record sink to MC
      multiWriteMode = true; // use new mode;
    }
    // Init odps
    odps = OdpsUtils.getOdps(config);
    odpsCreateLastTime = System.currentTimeMillis();
    odps.setEndpoint(endpoint);
    // Init converter builder
    RecordConverterBuilder.Format format = RecordConverterBuilder.Format.valueOf(
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.FORMAT.getName()));
    RecordConverterBuilder.Mode mode = RecordConverterBuilder.Mode.valueOf(
        config.getString(MaxComputeSinkConnectorConfig.BaseParameter.MODE.getName()));
    converterBuilder = new RecordConverterBuilder();
    converterBuilder.format(format).mode(mode);
    converterBuilder.schema(odps.tables().get(table).getSchema());

    // Parse partition window size
    partitionWindowType = PartitionWindowType.valueOf(
        config.getString(
            MaxComputeSinkConnectorConfig.BaseParameter.PARTITION_WINDOW_TYPE.getName()));
    // Parse time zone
    tz =
        TimeZone.getTimeZone(
            config.getString(MaxComputeSinkConnectorConfig.BaseParameter.TIME_ZONE.getName()));
    useStreamTunnel =
        config.getBoolean(MaxComputeSinkConnectorConfig.BaseParameter.USE_STREAM_TUNNEL.getName());

    if (useStreamTunnel) {
      LOGGER.info("MAXCOMPUTE STREAMING TUNNEL ENABLED.");
    }

    if (!StringUtils.isNullOrEmpty(
        config.getString(
            MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_NAME.getName()))
        && !StringUtils.isNullOrEmpty(
        config.getString(
            MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_BOOTSTRAP_SERVERS.getName()))) {

      runtimeErrorWriter = new KafkaWriter(config);
      LOGGER.info(
          "Thread(" + Thread.currentThread().getId() + ") new runtime error kafka writer done");
    }

    LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Start MaxCompute sink task done");
  }