in omega/omega-transaction/src/main/java/org/apache/servicecomb/pack/omega/transaction/CallbackContext.java [46:87]
public void apply(String globalTxId, String localTxId, String parentTxId, String callbackMethod, Object... payloads) {
String oldGlobalTxId = omegaContext.globalTxId();
String oldLocalTxId = omegaContext.localTxId();
try {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(localTxId);
if (contexts.containsKey(callbackMethod)) {
CallbackContextInternal contextInternal = contexts.get(callbackMethod);
contextInternal.callbackMethod.invoke(contextInternal.target, payloads);
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
sender.send(
new TxCompensateAckSucceedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
parentTxId, callbackMethod));
}
LOG.info("Callback transaction with global tx id [{}], local tx id [{}]", globalTxId, localTxId);
} else {
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
String msg = "callback method " + callbackMethod
+ " not found on CallbackContext, If it is starting, please try again later";
sender.send(
new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
parentTxId, callbackMethod, new Exception(msg)));
LOG.error(msg);
}else{
throw new NullPointerException();
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
if (omegaContext.getAlphaMetas().isAkkaEnabled()) {
sender.send(
new TxCompensateAckFailedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
parentTxId, callbackMethod, e));
}
LOG.error(
"Pre-checking for callback method " + callbackMethod
+ " was somehow skipped, did you forget to configure callback method checking on service startup?",
e);
} finally {
omegaContext.setGlobalTxId(oldGlobalTxId);
omegaContext.setLocalTxId(oldLocalTxId);
}
}