public void processEvent()

in cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java [138:194]


    public void processEvent(CdcEvent event)
    {
        String topic = topicSupplier.topic(event);
        cdcLogMode.info(logger(), "Processing CDC event", event, topic);
        long time = System.currentTimeMillis();
        byte[] recordPayload;
        try
        {
            recordPayload = getPayload(topic, event);
        }
        catch (Exception e)
        {
            cdcLogMode.warn(logger(), "Skip publishing the event because it cannot be serialized",
                            event, topic, e);
            throw e; // rethrow for user to handle
        }
        String publishKey = getOrBuildKafkaPrefix(event) + eventHasher.hashEvent(event);
        List<ProducerRecord<String, byte[]>> records = recordProducer()
                                                       .buildRecords(event, topic, publishKey,
                                                                     recordPayload);
        for (ProducerRecord<String, byte[]> record : records)
        {
            producer.send(record, (metadata, throwable) -> {
                long elapsedTime = System.currentTimeMillis() - time;
                if (throwable != null)
                {
                    kafkaStats.reportKafkaPublishError();
                    if (throwable instanceof RecordTooLargeException)
                    {
                        kafkaStats.reportKafkaRecordTooLarge();
                        cdcLogMode.error(logger(), "Kafka record too large exception", event, topic,
                                         throwable);
                        if (failOnRecordTooLargeError)
                        {
                            failure.compareAndSet(null, throwable);
                        }
                    }
                    else
                    {
                        cdcLogMode.error(logger(), "Error publishing record to Kafka", event, topic,
                                         throwable);
                        if (failOnKafkaError)
                        {
                            failure.compareAndSet(null, throwable);
                        }
                    }
                }
                else
                {
                    kafkaStats.changePublished(event);
                    logger().debug(
                    "Sent record(topic={}) meta(partition={}, offset={}) time={} topic={}",
                    topic, metadata.partition(), metadata.offset(), elapsedTime, topic);
                }
            });
        }
    }