in cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java [138:194]
public void processEvent(CdcEvent event)
{
String topic = topicSupplier.topic(event);
cdcLogMode.info(logger(), "Processing CDC event", event, topic);
long time = System.currentTimeMillis();
byte[] recordPayload;
try
{
recordPayload = getPayload(topic, event);
}
catch (Exception e)
{
cdcLogMode.warn(logger(), "Skip publishing the event because it cannot be serialized",
event, topic, e);
throw e; // rethrow for user to handle
}
String publishKey = getOrBuildKafkaPrefix(event) + eventHasher.hashEvent(event);
List<ProducerRecord<String, byte[]>> records = recordProducer()
.buildRecords(event, topic, publishKey,
recordPayload);
for (ProducerRecord<String, byte[]> record : records)
{
producer.send(record, (metadata, throwable) -> {
long elapsedTime = System.currentTimeMillis() - time;
if (throwable != null)
{
kafkaStats.reportKafkaPublishError();
if (throwable instanceof RecordTooLargeException)
{
kafkaStats.reportKafkaRecordTooLarge();
cdcLogMode.error(logger(), "Kafka record too large exception", event, topic,
throwable);
if (failOnRecordTooLargeError)
{
failure.compareAndSet(null, throwable);
}
}
else
{
cdcLogMode.error(logger(), "Error publishing record to Kafka", event, topic,
throwable);
if (failOnKafkaError)
{
failure.compareAndSet(null, throwable);
}
}
}
else
{
kafkaStats.changePublished(event);
logger().debug(
"Sent record(topic={}) meta(partition={}, offset={}) time={} topic={}",
topic, metadata.partition(), metadata.offset(), elapsedTime, topic);
}
});
}
}