in sample_apps_reinvent2021/dotnet/CrudAndSimpleIngestionExample.cs [363:534]
public async Task WriteRecordsWithUpsert()
{
Console.WriteLine("Writing records with upsert");
DateTimeOffset now = DateTimeOffset.UtcNow;
string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString();
// To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source
long version = now.ToUnixTimeMilliseconds();
List<Dimension> dimensions = new List<Dimension>{
new Dimension { Name = "region", Value = "us-east-1" },
new Dimension { Name = "az", Value = "az1" },
new Dimension { Name = "hostname", Value = "host1" }
};
var commonAttributes = new Record
{
Dimensions = dimensions,
MeasureValueType = MeasureValueType.MULTI,
Time = currentTimeString,
Version = version
};
var cpuUtilization = new Record
{
MeasureName = "cpu_utilization",
MeasureValues = new List<MeasureValue>{
new MeasureValue{
Name= "cpu1",
Value= "13.5",
Type= "DOUBLE",
}
},
};
var memoryUtilization = new Record
{
MeasureName = "memory_utilization",
MeasureValues = new List<MeasureValue>{
new MeasureValue{
Name= "memory1",
Value= "40",
Type= "DOUBLE",
}
}
};
List<Record> records = new List<Record>();
records.Add(cpuUtilization);
records.Add(memoryUtilization);
// write records for first time
try
{
var writeRecordsRequest = new WriteRecordsRequest
{
DatabaseName = Constants.DATABASE_NAME,
TableName = Constants.TABLE_NAME,
Records = records,
CommonAttributes = commonAttributes
};
WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest);
Console.WriteLine($"WriteRecords Status for first time: {response.HttpStatusCode.ToString()}");
}
catch (RejectedRecordsException e) {
PrintRejectedRecordsException(e);
}
catch (Exception e)
{
Console.WriteLine("Write records failure:" + e.ToString());
}
// Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent.
try
{
var writeRecordsRequest = new WriteRecordsRequest
{
DatabaseName = Constants.DATABASE_NAME,
TableName = Constants.TABLE_NAME,
Records = records,
CommonAttributes = commonAttributes
};
WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest);
Console.WriteLine($"WriteRecords Status for retry: {response.HttpStatusCode.ToString()}");
}
catch (RejectedRecordsException e) {
PrintRejectedRecordsException(e);
}
catch (Exception e)
{
Console.WriteLine("Write records failure:" + e.ToString());
}
// upsert with lower version, this would fail because a higher version is required to update the measure value.
version--;
var cpuUtilizationUpsert = new Record
{
MeasureName = "cpu_utilization",
MeasureValues = new List<MeasureValue>{
new MeasureValue{
Name= "cpu1",
Value= "13.5",
Type= "DOUBLE",
}
},
};
var memoryUtilizationUpsert = new Record
{
MeasureName = "memory_utilization",
MeasureValues = new List<MeasureValue>{
new MeasureValue{
Name= "memory1",
Value= "40",
Type= "DOUBLE",
}
},
};
Type recordType = typeof(Record);
recordType.GetProperty("Version").SetValue(commonAttributes, version);
List<Record> upsertedRecords = new List<Record> {
cpuUtilizationUpsert,
memoryUtilizationUpsert
};
try
{
var writeRecordsUpsertRequest = new WriteRecordsRequest
{
DatabaseName = Constants.DATABASE_NAME,
TableName = Constants.TABLE_NAME,
Records = upsertedRecords,
CommonAttributes = commonAttributes
};
WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest);
Console.WriteLine($"WriteRecords Status for upsert with lower version: {upsertResponse.HttpStatusCode.ToString()}");
}
catch (RejectedRecordsException e) {
PrintRejectedRecordsException(e);
}
catch (Exception e)
{
Console.WriteLine("Write records failure:" + e.ToString());
}
// upsert with higher version as new data in generated
now = DateTimeOffset.UtcNow;
version = now.ToUnixTimeMilliseconds();
recordType.GetProperty("Version").SetValue(commonAttributes, version);
try
{
var writeRecordsUpsertRequest = new WriteRecordsRequest
{
DatabaseName = Constants.DATABASE_NAME,
TableName = Constants.TABLE_NAME,
Records = upsertedRecords,
CommonAttributes = commonAttributes
};
WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest);
Console.WriteLine($"WriteRecords Status for upsert with higher version: {upsertResponse.HttpStatusCode.ToString()}");
}
catch (RejectedRecordsException e) {
PrintRejectedRecordsException(e);
}
catch (Exception e)
{
Console.WriteLine("Write records failure:" + e.ToString());
}
}