in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java [303:345]
public void invoke(OUT value, Context context) throws Exception {
if (this.producer == null) {
throw new RuntimeException("Kinesis producer has been closed");
}
checkAndPropagateAsyncError();
boolean didWaitForFlush = enforceQueueLimit();
if (didWaitForFlush) {
checkAndPropagateAsyncError();
}
String stream = defaultStream;
String partition = defaultPartition;
ByteBuffer serialized = schema.serialize(value);
// maybe set custom stream
String customStream = schema.getTargetStream(value);
if (customStream != null) {
stream = customStream;
}
String explicitHashkey = null;
// maybe set custom partition
if (customPartitioner != null) {
partition = customPartitioner.getPartitionId(value);
explicitHashkey = customPartitioner.getExplicitHashKey(value);
}
if (stream == null) {
if (failOnError) {
throw new RuntimeException("No target stream set");
} else {
LOG.warn("No target stream set. Skipping record");
return;
}
}
ListenableFuture<UserRecordResult> cb =
producer.addUserRecord(stream, partition, explicitHashkey, serialized);
Futures.addCallback(cb, callback, MoreExecutors.directExecutor());
}