in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [119:165]
public void start(Map<String, String> props) {
Map<String, Object> validatedProps = new CloudPubSubSinkConnector().config().parse(props);
cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
maxOutstandingRequestBytes =
(Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_REQUEST_BYTES);
maxOutstandingMessages =
(Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_MESSAGES);
maxDelayThresholdMs =
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_DELAY_THRESHOLD_MS);
maxRequestTimeoutMs =
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_REQUEST_TIMEOUT_MS);
maxTotalTimeoutMs =
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_TOTAL_TIMEOUT_MS);
maxShutdownTimeoutMs =
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_SHUTDOWN_TIMEOUT_MS);
messageBodyName = (String) validatedProps.get(CloudPubSubSinkConnector.CPS_MESSAGE_BODY_NAME);
includeMetadata = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_METADATA);
includeHeaders = (Boolean) validatedProps.get(CloudPubSubSinkConnector.PUBLISH_KAFKA_HEADERS);
orderingKeySource =
OrderingKeySource.getEnum(
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String credentialsPath = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null) {
try {
gcpCredentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (publisher == null) {
// Only do this if we did not use the constructor.
createPublisher();
}
log.info("Start CloudPubSubSinkTask");
}