in core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java [74:161]
public void start(Map<String, String> props) {
try {
LOG.info("Starting CamelSinkTask connector task");
Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);
if (context != null) {
try {
reporter = context.errantRecordReporter();
} catch (NoSuchMethodError | NoClassDefFoundError e) {
LOG.warn("Unable to instantiate ErrantRecordReporter. Method 'SinkTaskContext.errantRecordReporter' does not exist.");
reporter = null;
}
}
String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
try {
loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
} catch (Exception e) {
LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
}
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String componentSchema = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
final Boolean idempotencyEnabled = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF);
final String expressionType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF);
final String expressionHeader = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF);
final int memoryDimension = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_MEMORY_DIMENSION_CONF);
final String idempotentRepositoryType = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_REPOSITORY_TYPE_CONF);
final String idempotentRepositoryKafkaTopic = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_TOPIC_CONF);
final String idempotentRepositoryBootstrapServers = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF);
final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF);
final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF);
final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
CamelContext camelContext = new DefaultCamelContext();
// componentSchema can legitimately be null in case of kamelet connectors, in that case KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl" property is ignored
if (remoteUrl == null && componentSchema != null) {
remoteUrl = TaskHelper.buildUrl(camelContext,
actualProps,
componentSchema,
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
}
if (remoteUrl != null) {
actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);
}
cms = CamelKafkaConnectMain.builder(LOCAL_URL, getSinkKamelet())
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
.withAggregationSize(size)
.withAggregationTimeout(timeout)
.withErrorHandler(errorHandler)
.withMaxRedeliveries(maxRedeliveries)
.withRedeliveryDelay(redeliveryDelay)
.withIdempotencyEnabled(idempotencyEnabled)
.withExpressionType(expressionType)
.withExpressionHeader(expressionHeader)
.withMemoryDimension(memoryDimension)
.withIdempotentRepositoryType(idempotentRepositoryType)
.withIdempotentRepositoryTopicName(idempotentRepositoryKafkaTopic)
.withIdempotentRepositoryKafkaServers(idempotentRepositoryBootstrapServers)
.withIdempotentRepositoryKafkaMaxCacheSize(idempotentRepositoryKafkaMaxCacheSize)
.withIdempotentRepositoryKafkaPollDuration(idempotentRepositoryKafkaPollDuration)
.withHeadersExcludePattern(headersRemovePattern)
.build(camelContext);
cms.start();
producer = cms.getProducerTemplate();
localEndpoint = cms.getCamelContext().getEndpoint(LOCAL_URL);
LOG.info("CamelSinkTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);
}
}