in java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/UserShoppingCartFn.java [53:159]
public CompletableFuture<Void> apply(Context context, Message message) {
if (message.is(Messages.ADD_TO_CART)) {
final Messages.AddToCart addToCart = message.as(Messages.ADD_TO_CART);
LOG.info("{}", addToCart);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
final Messages.RequestItem requestItem = new Messages.RequestItem(addToCart.getQuantity());
final Message request =
MessageBuilder.forAddress(StockFn.TYPE, addToCart.getItemId())
.withCustomType(Messages.REQUEST_ITEM_TYPE, requestItem)
.build();
context.send(request);
LOG.info("---");
return context.done();
}
if (message.is(Messages.ITEM_AVAILABILITY_TYPE)) {
final Messages.ItemAvailability availability = message.as(Messages.ITEM_AVAILABILITY_TYPE);
LOG.info("{}", availability);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
if (Messages.ItemAvailability.Status.INSTOCK.equals(availability.getStatus())) {
final AddressScopedStorage storage = context.storage();
final Basket basket = storage.get(BASKET).orElse(Basket.initEmpty());
// ItemAvailability event comes from the Stock function and contains the itemId as the
// caller id
final Optional<Address> caller = context.caller();
if (caller.isPresent()) {
basket.add(caller.get().id(), availability.getQuantity());
} else {
throw new IllegalStateException("There should always be a caller in this example");
}
storage.set(BASKET, basket);
LOG.info("Basket: {}", basket);
LOG.info("---");
}
return context.done();
}
if (message.is(Messages.CLEAR_CART_TYPE)) {
final Messages.ClearCart clear = message.as(Messages.CLEAR_CART_TYPE);
final AddressScopedStorage storage = context.storage();
LOG.info("{}", clear);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
LOG.info("Basket: {}", storage.get(BASKET));
storage
.get(BASKET)
.ifPresent(
basket -> {
for (Map.Entry<String, Integer> entry : basket.getEntries()) {
Messages.RestockItem restockItem =
new Messages.RestockItem(entry.getKey(), entry.getValue());
Message restockCommand =
MessageBuilder.forAddress(StockFn.TYPE, entry.getKey())
.withCustomType(Messages.RESTOCK_ITEM_TYPE, restockItem)
.build();
context.send(restockCommand);
}
basket.clear();
});
LOG.info("---");
return context.done();
}
if (message.is(Messages.CHECKOUT_TYPE)) {
final Messages.Checkout checkout = message.as(Messages.CHECKOUT_TYPE);
final AddressScopedStorage storage = context.storage();
LOG.info("{}", checkout);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
LOG.info("Basket: {}", storage.get(BASKET));
final Optional<String> itemsOption =
storage
.get(BASKET)
.map(
basket ->
basket.getEntries().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.joining("\n")));
itemsOption.ifPresent(
items -> {
LOG.info("Receipt items: ");
LOG.info("{}", items);
final Messages.Receipt receipt = new Messages.Receipt(context.self().id(), items);
final EgressMessage egressMessage =
EgressMessageBuilder.forEgress(Identifiers.RECEIPT_EGRESS)
.withCustomType(
Messages.EGRESS_RECORD_JSON_TYPE,
new Messages.EgressRecord(Identifiers.RECEIPT_TOPICS, receipt.toString()))
.build();
context.send(egressMessage);
});
LOG.info("---");
}
return context.done();
}