public ListenableFuture addUserRecord()

in java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java [539:628]


    public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) {
        if (stream == null) {
            throw new IllegalArgumentException("Stream name cannot be null");
        }
        
        stream = stream.trim();
        
        if (stream.length() == 0) {
            throw new IllegalArgumentException("Stream name cannot be empty");
        }
        
        if (partitionKey == null) {
            throw new IllegalArgumentException("partitionKey cannot be null");
        }
        
        if (partitionKey.length() < 1 || partitionKey.length() > 256) {
            throw new IllegalArgumentException(
                    "Invalid partition key. Length must be at least 1 and at most 256, got " + partitionKey.length());
        }
        
        try {
            partitionKey.getBytes("UTF-8");
        } catch (Exception e) {
            throw new IllegalArgumentException("Partition key must be valid UTF-8");
        }
        
        BigInteger b = null;
        if (explicitHashKey != null) {
            explicitHashKey = explicitHashKey.trim();
            try {
                b = new BigInteger(explicitHashKey);
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid explicitHashKey, must be an integer, got " + explicitHashKey);
            }
            if (b != null) {
                if (b.compareTo(UINT_128_MAX) > 0 || b.compareTo(BigInteger.ZERO) < 0) {
                    throw new IllegalArgumentException(
                            "Invalid explicitHashKey, must be greater or equal to zero and less than or equal to (2^128 - 1), got " +
                                    explicitHashKey);
                }
            }
        }

        if (schema != null && data != null) {
            if (schema.getSchemaDefinition() == null || schema.getDataFormat() == null) {
                throw new IllegalArgumentException(
                    String.format(
                        "Schema specification is not valid. SchemaDefinition or DataFormat cannot be null. SchemaDefinition: %s, DataFormat: %s",
                        schema.getSchemaDefinition(),
                        schema.getDataFormat()
                    )
                );
            }
            GlueSchemaRegistrySerializer serializer = glueSchemaRegistrySerializerInstance.get(config);
            byte[] encodedBytes = serializer.encode(stream, schema, data.array());
            data = ByteBuffer.wrap(encodedBytes);
        }

        if (data != null && data.remaining() > 1024 * 1024) {
            throw new IllegalArgumentException(
                    "Data must be less than or equal to 1MB in size, got " + data.remaining() + " bytes");
        }
        
        long id = messageNumber.getAndIncrement();
        SettableFuture<UserRecordResult> f = SettableFuture.create();
        FutureTask<String> task = null;
        if(config.getUserRecordTimeoutInMillis() > 0) {
            task = new FutureTask(new FutureTimeoutRunnableTask(id), "TimedOut");
            futureTimeoutExecutor.schedule(task, config.getUserRecordTimeoutInMillis(), TimeUnit.MILLISECONDS);
        }
        SettableFutureTracker futuresTracking = new SettableFutureTracker(f, Instant.now(), Optional.ofNullable(task));
        futures.put(id, futuresTracking);
        oldestFutureTrackerHeap.add(futuresTracking);
        
        PutRecord.Builder pr = PutRecord.newBuilder()
                .setStreamName(stream)
                .setPartitionKey(partitionKey)
                .setData(data != null ? ByteString.copyFrom(data) : ByteString.EMPTY);
        if (b != null) {
            pr.setExplicitHashKey(b.toString(10));
        }
        
        Message m = Message.newBuilder()
                .setId(id)
                .setPutRecord(pr.build())
                .build();
        child.add(m);
        
        return f;
    }