public CompletableFuture apply()

in java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java [53:159]


  public CompletableFuture<Void> apply(Context context, Message message) {
    if (message.is(Messages.ADD_TO_CART)) {
      final Messages.AddToCart addToCart = message.as(Messages.ADD_TO_CART);
      LOG.info("{}", addToCart);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());

      final Messages.RequestItem requestItem = new Messages.RequestItem(addToCart.getQuantity());
      final Message request =
          MessageBuilder.forAddress(StockFn.TYPE, addToCart.getItemId())
              .withCustomType(Messages.REQUEST_ITEM_TYPE, requestItem)
              .build();
      context.send(request);
      LOG.info("---");
      return context.done();
    }

    if (message.is(Messages.ITEM_AVAILABILITY_TYPE)) {
      final Messages.ItemAvailability availability = message.as(Messages.ITEM_AVAILABILITY_TYPE);
      LOG.info("{}", availability);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());
      if (Messages.ItemAvailability.Status.INSTOCK.equals(availability.getStatus())) {
        final AddressScopedStorage storage = context.storage();
        final Basket basket = storage.get(BASKET).orElse(Basket.initEmpty());

        // ItemAvailability event comes from the Stock function and contains the itemId as the
        // caller id
        final Optional<Address> caller = context.caller();
        if (caller.isPresent()) {
          basket.add(caller.get().id(), availability.getQuantity());
        } else {
          throw new IllegalStateException("There should always be a caller in this example");
        }

        storage.set(BASKET, basket);
        LOG.info("Basket: {}", basket);
        LOG.info("---");
      }
      return context.done();
    }

    if (message.is(Messages.CLEAR_CART_TYPE)) {
      final Messages.ClearCart clear = message.as(Messages.CLEAR_CART_TYPE);
      final AddressScopedStorage storage = context.storage();

      LOG.info("{}", clear);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());
      LOG.info("Basket: {}", storage.get(BASKET));

      storage
          .get(BASKET)
          .ifPresent(
              basket -> {
                for (Map.Entry<String, Integer> entry : basket.getEntries()) {
                  Messages.RestockItem restockItem =
                      new Messages.RestockItem(entry.getKey(), entry.getValue());
                  Message restockCommand =
                      MessageBuilder.forAddress(StockFn.TYPE, entry.getKey())
                          .withCustomType(Messages.RESTOCK_ITEM_TYPE, restockItem)
                          .build();

                  context.send(restockCommand);
                }
                basket.clear();
              });
      LOG.info("---");
      return context.done();
    }

    if (message.is(Messages.CHECKOUT_TYPE)) {
      final Messages.Checkout checkout = message.as(Messages.CHECKOUT_TYPE);
      final AddressScopedStorage storage = context.storage();

      LOG.info("{}", checkout);
      LOG.info("Scope: {}", context.self());
      LOG.info("Caller: {}", context.caller());
      LOG.info("Basket: {}", storage.get(BASKET));

      final Optional<String> itemsOption =
          storage
              .get(BASKET)
              .map(
                  basket ->
                      basket.getEntries().stream()
                          .map(entry -> entry.getKey() + ": " + entry.getValue())
                          .collect(Collectors.joining("\n")));

      itemsOption.ifPresent(
          items -> {
            LOG.info("Receipt items: ");
            LOG.info("{}", items);

            final Messages.Receipt receipt = new Messages.Receipt(context.self().id(), items);
            final EgressMessage egressMessage =
                EgressMessageBuilder.forEgress(Identifiers.RECEIPT_EGRESS)
                    .withCustomType(
                        Messages.EGRESS_RECORD_JSON_TYPE,
                        new Messages.EgressRecord(Identifiers.RECEIPT_TOPICS, receipt.toString()))
                    .build();
            context.send(egressMessage);
          });
      LOG.info("---");
    }
    return context.done();
  }