in src/main/java/com/aws/iot/edgeconnectorforkvs/dataaccessor/StreamManager.java [103:156]
public long pushData(@NonNull String assetId, @NonNull String propertyId, @NonNull Object val,
Optional<Date> updateTimeStamp) {
try {
if (streamManagerClient == null || msgStreamName == null) {
final String errorMsg = String.format("Error pushing data to Stream. Property Id: %s." +
"Please create a message stream first.", propertyId);
log.error(errorMsg);
throw new EdgeConnectorForKVSException(errorMsg);
}
Variant variant = null;
if (val instanceof Integer) {
variant = new Variant().withIntegerValue(Long.valueOf((Integer) val));
} else if (val instanceof String) {
variant = new Variant().withStringValue((String) val);
} else if (val instanceof Double) {
variant = new Variant().withDoubleValue((Double) val);
} else if (val instanceof Boolean) {
variant = new Variant().withBooleanValue((Boolean) val);
} else {
final String errorMsg = String.format("Trying to push invalid val type",
msgStreamName, propertyId);
log.error(errorMsg);
throw new EdgeConnectorForKVSException(errorMsg);
}
List<AssetPropertyValue> entries = new ArrayList<>();
long epochSecond = Instant.now().getEpochSecond();
if (updateTimeStamp.isPresent()) {
epochSecond = updateTimeStamp.get().toInstant().getEpochSecond();
}
TimeInNanos timestamp = new TimeInNanos().withTimeInSeconds(epochSecond);
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(variant)
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withAssetId(assetId)
.withPropertyId(propertyId)
.withPropertyValues(entries);
log.debug("Pushing data to Message Stream: " + msgStreamName);
log.trace("For SiteWise Asset ID: " + assetId);
log.trace("For SiteWise Property ID: " + propertyId);
log.trace("Value: " + entries.get(0).toString());
return streamManagerClient.appendMessage(msgStreamName,
ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException | JsonProcessingException ex) {
final String errorMsg = String.format("Error pushing data to Stream: %s, Property Id: %s",
msgStreamName, propertyId);
log.error(errorMsg);
return 0L;
}
}