in statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java [119:145]
public EgressMessage build() {
KinesisEgressRecord.Builder builder = KinesisEgressRecord.newBuilder();
if (targetStreamBytes == null) {
throw new IllegalStateException("Missing destination Kinesis stream");
}
builder.setStreamBytes(targetStreamBytes);
if (partitionKeyBytes == null) {
throw new IllegalStateException("Missing partition key");
}
builder.setPartitionKeyBytes(partitionKeyBytes);
if (valueBytes == null) {
throw new IllegalStateException("Missing value");
}
builder.setValueBytes(valueBytes);
if (explicitHashKey != null) {
builder.setExplicitHashKeyBytes(explicitHashKey);
}
TypedValue typedValue =
TypedValue.newBuilder()
.setTypenameBytes(ApiExtension.typeNameByteString(KINESIS_PRODUCER_RECORD_TYPENAME))
.setValue(builder.build().toByteString())
.setHasValue(true)
.build();
return new EgressMessageWrapper(targetEgressId, typedValue);
}