private boolean compensation()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [602:656]


  private boolean compensation(DomainEvent event, TxEntity txEntity, SagaData data) {
    // increments the compensation running counter by one
    data.getCompensationRunningCounter().incrementAndGet();
    txEntity.setState(TxState.COMPENSATION_SENT);
    try {
      LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
      SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
    } catch (Exception ex) {
      LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), ex);
      if (txEntity.getReverseRetries() > 0 &&
          txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
        LOG.info("Retry compensate {}/{} [{}] {} after {} ms",
            txEntity.getRetriesCounter().get() + 1,
            txEntity.getReverseRetries(),
            txEntity.getGlobalTxId(),
            txEntity.getLocalTxId(),
            txEntity.getRetryDelayInMilliseconds());
        try {
          Thread.sleep(txEntity.getRetryDelayInMilliseconds());
        } catch (InterruptedException e) {
          LOG.error(e.getMessage(), e);
        }
      }
      if (ex instanceof TimeoutException) {
        StringWriter writer = new StringWriter();
        ex.printStackTrace(new PrintWriter(writer));
        String stackTrace = writer.toString();
        if (stackTrace.length() > Environment.getInstance().getPayloadsMaxLength()) {
          stackTrace = stackTrace.substring(0, Environment.getInstance().getPayloadsMaxLength());
        }
        CompensateAckTimeoutEvent compensateAckTimeoutEvent = CompensateAckTimeoutEvent.builder()
            .createTime(new Date(System.currentTimeMillis()))
            .globalTxId(txEntity.getGlobalTxId())
            .parentTxId(txEntity.getParentTxId())
            .localTxId(txEntity.getLocalTxId())
            .serviceName(txEntity.getServiceName())
            .instanceId(txEntity.getInstanceId())
            .payloads(stackTrace.getBytes())
            .build();
        self().tell(compensateAckTimeoutEvent, self());
      }
      if (ex instanceof AlphaException) {
        self().tell(TxCompensateAckFailedEvent.builder()
            .serviceName(txEntity.getServiceName())
            .instanceId(txEntity.getInstanceId())
            .globalTxId(txEntity.getGlobalTxId())
            .localTxId(txEntity.getLocalTxId())
            .parentTxId(txEntity.getParentTxId())
            .payloads(ex.getMessage().getBytes())
            .build(), self());
      }
      return false;
    }
    return true;
  }