public void update()

in function-apps/Update-Product-Inventory/src/main/java/org/inventory/hub/UpdateProductInventory.java [36:167]


    public void update(
        @EventHubTrigger(name = "data", eventHubName = "%TRANSACTIONS_EVENT_HUB_NAME%",
            connection = "TRANSACTIONS_EVENT_HUB_CONNECTION_STRING",
            cardinality = Cardinality.ONE,
            consumerGroup = "%TRANSACTIONS_EVENT_HUB_CONSUMER_GROUP_NAME%") String data,
        @CosmosDBOutput(name = "document", databaseName = "%PRODUCT_INVENTORY_COSMOSDB_DBNAME%",
            collectionName = "%PRODUCT_INVENTORY_COSMOSDB_COLLECTION_NAME%",
            connectionStringSetting = "PRODUCT_INVENTORY_COSMOSDB_CONNECTION_STRING",
            createIfNotExists = true)
            OutputBinding<String> document,
        @CosmosDBInput(name = "documents", databaseName = "%PRODUCT_INVENTORY_COSMOSDB_DBNAME%",
        collectionName = "%PRODUCT_INVENTORY_COSMOSDB_COLLECTION_NAME%",
        connectionStringSetting = "PRODUCT_INVENTORY_COSMOSDB_CONNECTION_STRING",
        sqlQuery = "SELECT * FROM root r")
            String inputDoc,
        final ExecutionContext context) {

        context.getLogger().info("Java Event Hub transaction trigger from "
            + System.getenv("UPDATE_PRODUCT_INVENTORY_FUNCTION_APP_NAME")
            + "(" + System.getenv("UPDATE_PRODUCT_INVENTORY_FUNCTION_APP_NAME")
            + ") processed a request: " + data);
        final Gson gson = new GsonBuilder().create();
        JSONObject eventHubMessage = new JSONObject(data);
        eventHubMessage.put("id", UUID.randomUUID().toString());
        context.getLogger().info("\tFound eventGridMessage: " + eventHubMessage.toString());
        // context.getLogger().info("\tFound CosmosDB: " + inputDoc);
        Map<String, Map<String, ProductInventory>> currentProductInventoryByLocation = new HashMap<>();
        if (inputDoc != null) {
            JSONArray currentProductInventory = new JSONArray(inputDoc);
            for (Object item : currentProductInventory) {
                context.getLogger().info("\tFound currentProductInventory item: " + item.toString());

                ProductInventory productInventory = gson.fromJson(item.toString(), ProductInventory.class);
                if (productInventory.location != null && productInventory.productId != null) {
                    if (currentProductInventoryByLocation.get(productInventory.location) == null) {
                        Map<String, ProductInventory> productById = new HashMap<>();
                        productById.put(productInventory.productId, productInventory);
                        currentProductInventoryByLocation.put(productInventory.location, productById);
                    } else {
                        Map<String, ProductInventory> productById = currentProductInventoryByLocation.get(productInventory.location);
                        productById.putIfAbsent(productInventory.productId, productInventory);
                    }
                }
            }
            // context.getLogger().info("\tBuilt Map of product inventory: " + gson.toJson(currentProductInventoryByLocation));

            eventHubMessage.put("id", "1");
        }

        String pointOfTransactionData = (String) eventHubMessage.get("pointOfTransaction").toString();
        context.getLogger().info("\tFound pointOfTransactionData: " + pointOfTransactionData);
        JSONObject pointOfTransaction = new JSONObject(pointOfTransactionData);
        context.getLogger().info("\tFound pointOfTransaction: " + pointOfTransaction);
        context.getLogger().info("\tFound pointOfTransaction location: " + pointOfTransaction.get("location"));

        String productInformationData = (String) eventHubMessage.get("productInformation").toString();
        context.getLogger().info("\tFound productInformationData: " + productInformationData);
        JSONObject productInformation = new JSONObject(productInformationData);
        context.getLogger().info("\tFound pointOfTransaction: " + productInformation);
        context.getLogger().info("\tFound pointOfTransaction productId: " + productInformation.get("productId"));
        context.getLogger().info("\tFound pointOfTransaction productName: " + productInformation.get("productName"));
        context.getLogger().info("\tFound pointOfTransaction description: " + productInformation.get("description"));
        context.getLogger().info("\tFound pointOfTransaction count: " + productInformation.get("count"));

        ProductInventory productInventoryOutput = new ProductInventory();
        productInventoryOutput.id = UUID.randomUUID().toString();
        productInventoryOutput.location = pointOfTransaction.get("location").toString();
        productInventoryOutput.productId = productInformation.get("productId").toString();
        productInventoryOutput.productName = productInformation.get("productName").toString();
        productInventoryOutput.description = productInformation.get("description").toString();

        long currentCount = Long.valueOf(productInformation.get("count").toString());
        Map<String, ProductInventory> productFromMap = currentProductInventoryByLocation.get(productInventoryOutput.location);
        if (productFromMap != null && productFromMap.get(productInventoryOutput.productId) != null) {
            productInventoryOutput.id = productFromMap.get(productInventoryOutput.productId).id;
            String totalCount = productFromMap.get(productInventoryOutput.productId).totalCount;
            long currentProductTotal = Long.valueOf(totalCount != null ? totalCount : "0");
            if (eventHubMessage.get("type").toString().equals("intake")) {
                currentCount += currentProductTotal; // add to the inventory
            } else if (eventHubMessage.get("type").toString().equals("sell")) {
                currentCount = currentProductTotal - currentCount; // subtract from the inventory
            } else {
                currentCount = 0;
            }
        } else {
            if (eventHubMessage.get("type").toString().equals("intake")) {
                currentCount = currentCount; // intake
            } else if (eventHubMessage.get("type").toString().equals("sell")) {
                currentCount = -currentCount; // sell
            } else {
                currentCount = 0;
            }
        }
        productInventoryOutput.totalCount = Long.toString(currentCount);

        context.getLogger().info("\tSaving: " + gson.toJson(productInventoryOutput));

        String cosmosDBUri = System.getenv("PRODUCT_INVENTORY_COSMOSDB_URI");
        String cosmosDBKey = System.getenv("PRODUCT_INVENTORY_COSMOSDB_KEY");

        // Extract CosmosDB URI and key
        String cosmosDbConnectionString = System.getenv("PRODUCT_INVENTORY_COSMOSDB_CONNECTION_STRING");
        String pattern = "AccountEndpoint=(.*);AccountKey=(.*);";
        Pattern r = Pattern.compile(pattern);
        Matcher m = r.matcher(cosmosDbConnectionString);
        if (m.find()) {
            cosmosDBUri = m.group(1);
            cosmosDBKey = m.group(2);
        }

        DocumentClient client = new DocumentClient(cosmosDBUri, cosmosDBKey, null, null);

        final String storedProcedureLink = String.format("/dbs/%s/colls/%s/sprocs/update-product-inventory",
            System.getenv("PRODUCT_INVENTORY_COSMOSDB_DBNAME"),
            System.getenv("PRODUCT_INVENTORY_COSMOSDB_COLLECTION_NAME"));
        Object[] procedureParams = {
            eventHubMessage.get("type"),
            productInformation.get("count").toString(),
            productInformation.get("productId").toString(),
            productInformation.get("productName"),
            productInformation.get("description"),
            pointOfTransaction.get("location")
        };
        try {
            context.getLogger().info("\t Stored Procedure call: " + gson.toJson(procedureParams));
            RequestOptions requestOptions = new RequestOptions();
            requestOptions.setPartitionKey(new PartitionKey(pointOfTransaction.get("location")));
            client.executeStoredProcedure(storedProcedureLink, requestOptions, procedureParams);
        } catch (Exception e) {
            context.getLogger().info("ERROR Stored Procedure call failed: " + gson.toJson(e));
        }
    }