in app/common/src/main/java/com/googlecodesamples/cloud/jss/common/action/BaseAction.java [63:97]
public abstract T respond(
AckReplyConsumer consumer, PubsubMessage message, float processTime, Timestamp publishTime)
throws IOException;
/**
* This method defines the action to be taken after the message has been processed.
*
* @param newMessage the ack message to be processed
* @throws IOException if the newMessage cannot be converted to Cloud Pub/Sub compatible format.
* @throws InterruptedException if the current thread was interrupted while sending the message.
* @throws ExecutionException if the computation in {@link com.google.api.core.ApiFuture#get()}
* threw an exception
*/
public abstract void postProcess(T newMessage)
throws IOException, InterruptedException, ExecutionException;
/**
* Retrieve the MessageReceiver instance, which defines the actions to be taken when a message is
* received.
*
* @return the message receiver
*/
public final MessageReceiver getReceiver() {
return (PubsubMessage message, AckReplyConsumer consumer) -> {
try {
logger.info("metric receive message: {}", PubSubUtil.getMessageData(message));
T newMessage = process(message, consumer);
postProcess(newMessage);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
}
};
}