public void invoke()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java [303:345]


    public void invoke(OUT value, Context context) throws Exception {
        if (this.producer == null) {
            throw new RuntimeException("Kinesis producer has been closed");
        }

        checkAndPropagateAsyncError();
        boolean didWaitForFlush = enforceQueueLimit();

        if (didWaitForFlush) {
            checkAndPropagateAsyncError();
        }

        String stream = defaultStream;
        String partition = defaultPartition;

        ByteBuffer serialized = schema.serialize(value);

        // maybe set custom stream
        String customStream = schema.getTargetStream(value);
        if (customStream != null) {
            stream = customStream;
        }

        String explicitHashkey = null;
        // maybe set custom partition
        if (customPartitioner != null) {
            partition = customPartitioner.getPartitionId(value);
            explicitHashkey = customPartitioner.getExplicitHashKey(value);
        }

        if (stream == null) {
            if (failOnError) {
                throw new RuntimeException("No target stream set");
            } else {
                LOG.warn("No target stream set. Skipping record");
                return;
            }
        }

        ListenableFuture<UserRecordResult> cb =
                producer.addUserRecord(stream, partition, explicitHashkey, serialized);
        Futures.addCallback(cb, callback, MoreExecutors.directExecutor());
    }