in src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java [50:91]
public void put(Collection<SinkRecord> records) {
if (CollectionUtils.isEmpty(records)) {
logger.info("No records to be written");
return;
}
logger.info("Sending {} records to be written", records.size());
Map<String, List<SinkRecord>> recordsByContainer = records.stream()
// Find target container for each record
.collect(Collectors.groupingBy(record -> config.getTopicContainerMap()
.getContainerForTopic(record.topic())
.orElseThrow(() -> new IllegalStateException(
String.format("No container defined for topic %s .", record.topic())))));
for (Map.Entry<String, List<SinkRecord>> entry : recordsByContainer.entrySet()) {
String containerName = entry.getKey();
CosmosContainer container = client.getDatabase(config.getDatabaseName()).getContainer(containerName);
for (SinkRecord record : entry.getValue()) {
logger.debug("Writing record, value type: {}", record.value().getClass().getName());
logger.debug("Key Schema: {}", record.keySchema());
logger.debug("Value schema: {}", record.valueSchema());
logger.trace("Value.toString(): {}", record.value());
Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else {
recordValue = record.value();
}
maybeInsertId(recordValue, record);
logger.trace("Value after inserting ID: {}", recordValue);
try {
addItemToContainer(container, recordValue);
} catch (BadRequestException bre) {
throw new CosmosDBWriteException(record, bre);
}
}
}
}