in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [602:656]
private boolean compensation(DomainEvent event, TxEntity txEntity, SagaData data) {
// increments the compensation running counter by one
data.getCompensationRunningCounter().incrementAndGet();
txEntity.setState(TxState.COMPENSATION_SENT);
try {
LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
} catch (Exception ex) {
LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), ex);
if (txEntity.getReverseRetries() > 0 &&
txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
LOG.info("Retry compensate {}/{} [{}] {} after {} ms",
txEntity.getRetriesCounter().get() + 1,
txEntity.getReverseRetries(),
txEntity.getGlobalTxId(),
txEntity.getLocalTxId(),
txEntity.getRetryDelayInMilliseconds());
try {
Thread.sleep(txEntity.getRetryDelayInMilliseconds());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
}
if (ex instanceof TimeoutException) {
StringWriter writer = new StringWriter();
ex.printStackTrace(new PrintWriter(writer));
String stackTrace = writer.toString();
if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
}
CompensateAckTimeoutEvent compensateAckTimeoutEvent = CompensateAckTimeoutEvent.builder()
.createTime(new Date(System.currentTimeMillis()))
.globalTxId(txEntity.getGlobalTxId())
.parentTxId(txEntity.getParentTxId())
.localTxId(txEntity.getLocalTxId())
.serviceName(txEntity.getServiceName())
.instanceId(txEntity.getInstanceId())
.payloads(stackTrace.getBytes())
.build();
self().tell(compensateAckTimeoutEvent, self());
}
if (ex instanceof AlphaException) {
self().tell(TxCompensateAckFailedEvent.builder()
.serviceName(txEntity.getServiceName())
.instanceId(txEntity.getInstanceId())
.globalTxId(txEntity.getGlobalTxId())
.localTxId(txEntity.getLocalTxId())
.parentTxId(txEntity.getParentTxId())
.payloads(ex.getMessage().getBytes())
.build(), self());
}
return false;
}
return true;
}