in java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java [539:628]
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) {
if (stream == null) {
throw new IllegalArgumentException("Stream name cannot be null");
}
stream = stream.trim();
if (stream.length() == 0) {
throw new IllegalArgumentException("Stream name cannot be empty");
}
if (partitionKey == null) {
throw new IllegalArgumentException("partitionKey cannot be null");
}
if (partitionKey.length() < 1 || partitionKey.length() > 256) {
throw new IllegalArgumentException(
"Invalid partition key. Length must be at least 1 and at most 256, got " + partitionKey.length());
}
try {
partitionKey.getBytes("UTF-8");
} catch (Exception e) {
throw new IllegalArgumentException("Partition key must be valid UTF-8");
}
BigInteger b = null;
if (explicitHashKey != null) {
explicitHashKey = explicitHashKey.trim();
try {
b = new BigInteger(explicitHashKey);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid explicitHashKey, must be an integer, got " + explicitHashKey);
}
if (b != null) {
if (b.compareTo(UINT_128_MAX) > 0 || b.compareTo(BigInteger.ZERO) < 0) {
throw new IllegalArgumentException(
"Invalid explicitHashKey, must be greater or equal to zero and less than or equal to (2^128 - 1), got " +
explicitHashKey);
}
}
}
if (schema != null && data != null) {
if (schema.getSchemaDefinition() == null || schema.getDataFormat() == null) {
throw new IllegalArgumentException(
String.format(
"Schema specification is not valid. SchemaDefinition or DataFormat cannot be null. SchemaDefinition: %s, DataFormat: %s",
schema.getSchemaDefinition(),
schema.getDataFormat()
)
);
}
GlueSchemaRegistrySerializer serializer = glueSchemaRegistrySerializerInstance.get(config);
byte[] encodedBytes = serializer.encode(stream, schema, data.array());
data = ByteBuffer.wrap(encodedBytes);
}
if (data != null && data.remaining() > 1024 * 1024) {
throw new IllegalArgumentException(
"Data must be less than or equal to 1MB in size, got " + data.remaining() + " bytes");
}
long id = messageNumber.getAndIncrement();
SettableFuture<UserRecordResult> f = SettableFuture.create();
FutureTask<String> task = null;
if(config.getUserRecordTimeoutInMillis() > 0) {
task = new FutureTask(new FutureTimeoutRunnableTask(id), "TimedOut");
futureTimeoutExecutor.schedule(task, config.getUserRecordTimeoutInMillis(), TimeUnit.MILLISECONDS);
}
SettableFutureTracker futuresTracking = new SettableFutureTracker(f, Instant.now(), Optional.ofNullable(task));
futures.put(id, futuresTracking);
oldestFutureTrackerHeap.add(futuresTracking);
PutRecord.Builder pr = PutRecord.newBuilder()
.setStreamName(stream)
.setPartitionKey(partitionKey)
.setData(data != null ? ByteString.copyFrom(data) : ByteString.EMPTY);
if (b != null) {
pr.setExplicitHashKey(b.toString(10));
}
Message m = Message.newBuilder()
.setId(id)
.setPutRecord(pr.build())
.build();
child.add(m);
return f;
}