public CompletableFuture apply()

in java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/StockFn.java [44:88]


  public CompletableFuture<Void> apply(Context context, Message message) {
    AddressScopedStorage storage = context.storage();
    final int quantity = storage.get(STOCK).orElse(0);
    if (message.is(RESTOCK_ITEM_TYPE)) {
      RestockItem restock = message.as(RESTOCK_ITEM_TYPE);

      LOG.info("{}", restock);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());

      final int newQuantity = quantity + restock.getQuantity();
      storage.set(STOCK, newQuantity);
      LOG.info("---");
      return context.done();
    } else if (message.is(REQUEST_ITEM_TYPE)) {
      final RequestItem request = message.as(REQUEST_ITEM_TYPE);
      LOG.info("{}", request);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());

      final int requestQuantity = request.getQuantity();

      final ItemAvailability itemAvailability;
      LOG.info("Available quantity: {}", quantity);
      LOG.info("Requested quantity: {}", requestQuantity);
      if (quantity >= requestQuantity) {
        storage.set(STOCK, quantity - requestQuantity);
        itemAvailability = new ItemAvailability(Status.INSTOCK, requestQuantity);
      } else {
        itemAvailability = new ItemAvailability(Status.OUTOFSTOCK, requestQuantity);
      }

      final Optional<Address> caller = context.caller();
      if (caller.isPresent()) {
        context.send(
            MessageBuilder.forAddress(caller.get())
                .withCustomType(ITEM_AVAILABILITY_TYPE, itemAvailability)
                .build());
      } else {
        throw new IllegalStateException("There should always be a caller in this example");
      }
      LOG.info("---");
    }
    return context.done();
  }