in java/shopping-cart/src/main/java/org/apache/flink/statefun/playground/java/shoppingcart/StockFn.java [44:88]
public CompletableFuture<Void> apply(Context context, Message message) {
AddressScopedStorage storage = context.storage();
final int quantity = storage.get(STOCK).orElse(0);
if (message.is(RESTOCK_ITEM_TYPE)) {
RestockItem restock = message.as(RESTOCK_ITEM_TYPE);
LOG.info("{}", restock);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
final int newQuantity = quantity + restock.getQuantity();
storage.set(STOCK, newQuantity);
LOG.info("---");
return context.done();
} else if (message.is(REQUEST_ITEM_TYPE)) {
final RequestItem request = message.as(REQUEST_ITEM_TYPE);
LOG.info("{}", request);
LOG.info("Scope: {}", context.self());
LOG.info("Caller: {}", context.caller());
final int requestQuantity = request.getQuantity();
final ItemAvailability itemAvailability;
LOG.info("Available quantity: {}", quantity);
LOG.info("Requested quantity: {}", requestQuantity);
if (quantity >= requestQuantity) {
storage.set(STOCK, quantity - requestQuantity);
itemAvailability = new ItemAvailability(Status.INSTOCK, requestQuantity);
} else {
itemAvailability = new ItemAvailability(Status.OUTOFSTOCK, requestQuantity);
}
final Optional<Address> caller = context.caller();
if (caller.isPresent()) {
context.send(
MessageBuilder.forAddress(caller.get())
.withCustomType(ITEM_AVAILABILITY_TYPE, itemAvailability)
.build());
} else {
throw new IllegalStateException("There should always be a caller in this example");
}
LOG.info("---");
}
return context.done();
}