in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java [133:159]
public void start(Map<String, String> props) {
// Do a validation of configs here too so that we do not pass null objects to
// verifySubscription().
config().parse(props);
String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG);
String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG);
String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider();
if (credentialsPath != null) {
try {
credentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
credentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
this.props = props;
log.info("Started the CloudPubSubSourceConnector");
}