public long pushData()

in src/main/java/com/aws/iot/edgeconnectorforkvs/dataaccessor/StreamManager.java [103:156]


    public long pushData(@NonNull String assetId, @NonNull String propertyId, @NonNull Object val,
                         Optional<Date> updateTimeStamp) {
        try {
            if (streamManagerClient == null || msgStreamName == null) {
                final String errorMsg = String.format("Error pushing data to Stream. Property Id: %s." +
                        "Please create a message stream first.", propertyId);
                log.error(errorMsg);
                throw new EdgeConnectorForKVSException(errorMsg);
            }
            Variant variant = null;
            if (val instanceof Integer) {
                variant = new Variant().withIntegerValue(Long.valueOf((Integer) val));
            } else if (val instanceof String) {
                variant = new Variant().withStringValue((String) val);
            } else if (val instanceof Double) {
                variant = new Variant().withDoubleValue((Double) val);
            } else if (val instanceof Boolean) {
                variant = new Variant().withBooleanValue((Boolean) val);
            } else {
                final String errorMsg = String.format("Trying to push invalid val type",
                        msgStreamName, propertyId);
                log.error(errorMsg);
                throw new EdgeConnectorForKVSException(errorMsg);
            }
            List<AssetPropertyValue> entries = new ArrayList<>();
            long epochSecond = Instant.now().getEpochSecond();
            if (updateTimeStamp.isPresent()) {
                epochSecond = updateTimeStamp.get().toInstant().getEpochSecond();
            }
            TimeInNanos timestamp = new TimeInNanos().withTimeInSeconds(epochSecond);

            AssetPropertyValue entry = new AssetPropertyValue()
                    .withValue(variant)
                    .withQuality(Quality.GOOD)
                    .withTimestamp(timestamp);
            entries.add(entry);
            PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
                    .withEntryId(UUID.randomUUID().toString())
                    .withAssetId(assetId)
                    .withPropertyId(propertyId)
                    .withPropertyValues(entries);
            log.debug("Pushing data to Message Stream: " + msgStreamName);
            log.trace("For SiteWise Asset ID: " + assetId);
            log.trace("For SiteWise Property ID: " + propertyId);
            log.trace("Value: " + entries.get(0).toString());
            return streamManagerClient.appendMessage(msgStreamName,
                    ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
        } catch (StreamManagerException | JsonProcessingException ex) {
            final String errorMsg = String.format("Error pushing data to Stream: %s, Property Id: %s",
                    msgStreamName, propertyId);
            log.error(errorMsg);
            return 0L;
        }
    }