in aws/main-endpointdsl-aws-kcl-kinesis/src/main/java/org/apache/camel/example/MyRouteBuilder.java [34:61]
public void configure() throws Exception {
from(aws2Kinesis("{{streamName}}").useDefaultCredentialsProvider(true).advanced().useKclConsumers(true).asyncClient(true))
.log("The content is ${body} from ${headers.CamelAwsKinesisShardId}").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
ByteBuffer buffer = exchange.getMessage().getBody(ByteBuffer.class);
buffer.flip();
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);
exchange.getMessage().setBody(charBuffer.toString());
}
}).to(file("./target/kcl-files/")).startupOrder(1);
from(timer("kcl-ingest-1").repeatCount(5))
.setBody(constant("Camel KCL Test Partition 1"))
.setHeader(Kinesis2Constants.PARTITION_KEY, constant("partition-1"))
.to(aws2Kinesis("{{streamName}}").useDefaultCredentialsProvider(true)).startupOrder(2);
from(timer("kcl-ingest-2").repeatCount(5))
.setBody(constant("Camel KCL Test Partition 2"))
.setHeader(Kinesis2Constants.PARTITION_KEY, constant("partition-2"))
.to(aws2Kinesis("{{streamName}}").useDefaultCredentialsProvider(true)).startupOrder(3);
from(timer("kcl-ingest-3").repeatCount(5))
.setBody(constant("Camel KCL Test Partition 3"))
.setHeader(Kinesis2Constants.PARTITION_KEY, constant("partition-3"))
.to(aws2Kinesis("{{streamName}}").useDefaultCredentialsProvider(true)).startupOrder(4);
}