in sample_apps/javaV2/src/main/java/com/amazonaws/services/timestream/CrudAndSimpleIngestionExample.java [255:374]
public void writeRecordsWithUpsert() {
System.out.println("Writing records with upsert");
// Specify repeated values for all records
List<Record> records = new ArrayList<>();
final long time = System.currentTimeMillis();
// To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source
long version = System.currentTimeMillis();
List<Dimension> dimensions = new ArrayList<>();
final Dimension region = Dimension.builder().name("region").value("us-east-1").build();
final Dimension az = Dimension.builder().name("az").value("az1").build();
final Dimension hostname = Dimension.builder().name("hostname").value("host1").build();
dimensions.add(region);
dimensions.add(az);
dimensions.add(hostname);
Record commonAttributes = Record.builder()
.dimensions(dimensions)
.measureValueType(MeasureValueType.DOUBLE)
.time(String.valueOf(time))
.version(version)
.build();
Record cpuUtilization = Record.builder()
.measureName("cpu_utilization")
.measureValue("13.5").build();
Record memoryUtilization = Record.builder()
.measureName("memory_utilization")
.measureValue("40").build();
records.add(cpuUtilization);
records.add(memoryUtilization);
WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder()
.databaseName(DATABASE_NAME)
.tableName(TABLE_NAME)
.commonAttributes(commonAttributes)
.records(records).build();
// write records for first time
try {
WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest);
System.out.println("WriteRecords Status for first time: " + writeRecordsResponse.sdkHttpResponse().statusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent.
try {
WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest);
System.out.println("WriteRecords Status for retry: " + writeRecordsResponse.sdkHttpResponse().statusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// upsert with lower version, this would fail because a higher version is required to update the measure value.
version -= 1;
commonAttributes = Record.builder()
.dimensions(dimensions)
.measureValueType(MeasureValueType.DOUBLE)
.time(String.valueOf(time))
.version(version)
.build();
cpuUtilization = Record.builder()
.measureName("cpu_utilization")
.measureValue("14.5").build();
memoryUtilization = Record.builder()
.measureName("memory_utilization")
.measureValue("50").build();
List<Record> upsertedRecords = new ArrayList<>();
upsertedRecords.add(cpuUtilization);
upsertedRecords.add(memoryUtilization);
WriteRecordsRequest writeRecordsUpsertRequest = WriteRecordsRequest.builder()
.databaseName(DATABASE_NAME)
.tableName(TABLE_NAME)
.commonAttributes(commonAttributes)
.records(upsertedRecords).build();
try {
WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest);
System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsResponse.sdkHttpResponse().statusCode());
} catch (RejectedRecordsException e) {
System.out.println("WriteRecords Status for upsert with lower version: ");
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
// upsert with higher version as new data is generated
version = System.currentTimeMillis();
commonAttributes = Record.builder()
.dimensions(dimensions)
.measureValueType(MeasureValueType.DOUBLE)
.time(String.valueOf(time))
.version(version)
.build();
writeRecordsUpsertRequest = WriteRecordsRequest.builder()
.databaseName(DATABASE_NAME)
.tableName(TABLE_NAME)
.commonAttributes(commonAttributes)
.records(upsertedRecords).build();
try {
WriteRecordsResponse writeRecordsUpsertResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest);
System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResponse.sdkHttpResponse().statusCode());
} catch (RejectedRecordsException e) {
printRejectedRecordsException(e);
} catch (Exception e) {
System.out.println("Error: " + e);
}
}