in sample_apps/java/src/main/java/com/amazonaws/services/timestream/CrudAndSimpleIngestionExample.java [265:369]
public void writeRecordsWithUpsert() {
System.out.println("Writing records with upsert");
// Specify repeated values for all records
List<Record> records = new ArrayList<>();
final long time = System.currentTimeMillis();
// To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source
long version = System.currentTimeMillis();
List<Dimension> dimensions = new ArrayList<>();
final Dimension region = new Dimension().withName("region").withValue("us-east-1");
final Dimension az = new Dimension().withName("az").withValue("az1");
final Dimension hostname = new Dimension().withName("hostname").withValue("host1");
dimensions.add(region);
dimensions.add(az);
dimensions.add(hostname);
Record commonAttributes = new Record()
.withDimensions(dimensions)
.withMeasureValueType(MeasureValueType.DOUBLE)
.withTime(String.valueOf(time))
.withVersion(version);
Record cpuUtilization = new Record()
.withMeasureName("cpu_utilization")
.withMeasureValue("13.5");
Record memoryUtilization = new Record()
.withMeasureName("memory_utilization")
.withMeasureValue("40");
records.add(cpuUtilization);
records.add(memoryUtilization);
WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest()
.withDatabaseName(DATABASE_NAME)
.withTableName(TABLE_NAME)
.withCommonAttributes(commonAttributes);
writeRecordsRequest.setRecords(records);
// write records for first time
try {
WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest);
System.out.println("WriteRecords Status for first time: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent.
try {
WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest);
System.out.println("WriteRecords Status for retry: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// upsert with lower version, this would fail because a higher version is required to update the measure value.
version -= 1;
commonAttributes.setVersion(version);
cpuUtilization.setMeasureValue("14.5");
memoryUtilization.setMeasureValue("50");
List<Record> upsertedRecords = new ArrayList<>();
upsertedRecords.add(cpuUtilization);
upsertedRecords.add(memoryUtilization);
WriteRecordsRequest writeRecordsUpsertRequest = new WriteRecordsRequest()
.withDatabaseName(DATABASE_NAME)
.withTableName(TABLE_NAME)
.withCommonAttributes(commonAttributes);
writeRecordsUpsertRequest.setRecords(upsertedRecords);
try {
WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest);
System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode());
} catch (RejectedRecordsException e) {
System.out.println("WriteRecords Status for upsert with lower version: ");
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// upsert with higher version as new data is generated
version = System.currentTimeMillis();
commonAttributes.setVersion(version);
writeRecordsUpsertRequest = new WriteRecordsRequest()
.withDatabaseName(DATABASE_NAME)
.withTableName(TABLE_NAME)
.withCommonAttributes(commonAttributes);
writeRecordsUpsertRequest.setRecords(upsertedRecords);
try {
WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest);
System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
}