public SagaData applyEvent()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [467:581]


  public SagaData applyEvent(DomainEvent event, SagaData data) {
    LOG.debug("apply domain event {}", event.getEvent());
    try{
      if (this.recoveryRunning()) {
        LOG.info("recovery {}",event.getEvent());
      }else if (LOG.isDebugEnabled()) {
        LOG.debug("persistence {}", event.getEvent());
      }
      if (event.getEvent() != null && !(event
          .getEvent() instanceof ComponsitedCheckEvent)) {
        data.logEvent(event.getEvent());
      }
      if (event instanceof SagaStartedDomain) {
        SagaStartedDomain domainEvent = (SagaStartedDomain) event;
        data.setServiceName(domainEvent.getEvent().getServiceName());
        data.setInstanceId(domainEvent.getEvent().getInstanceId());
        data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
        data.setBeginTime(domainEvent.getEvent().getCreateTime());
        data.setExpirationTime(domainEvent.getExpirationTime());
      } else if (event instanceof AddTxEventDomain) {
        AddTxEventDomain domainEvent = (AddTxEventDomain) event;
        if (!data.getTxEntities().exists(domainEvent.getEvent().getLocalTxId())) {
          TxEntity txEntity = TxEntity.builder()
              .serviceName(domainEvent.getEvent().getServiceName())
              .instanceId(domainEvent.getEvent().getInstanceId())
              .globalTxId(domainEvent.getEvent().getGlobalTxId())
              .localTxId(domainEvent.getEvent().getLocalTxId())
              .parentTxId(domainEvent.getEvent().getParentTxId())
              .compensationMethod(domainEvent.getCompensationMethod())
              .payloads(domainEvent.getPayloads())
              .state(domainEvent.getState())
              .reverseRetries(domainEvent.getReverseRetries())
              .reverseTimeout(domainEvent.getReverseTimeout())
              .retryDelayInMilliseconds(domainEvent.getRetryDelayInMilliseconds())
              .beginTime(domainEvent.getEvent().getCreateTime())
              .build();
          data.getTxEntities().put(txEntity.getLocalTxId(), txEntity);
        } else {
          LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
        }
      } else if (event instanceof UpdateTxEventDomain) {
        UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
        TxEntity txEntity = data.getTxEntities().get(domainEvent.getLocalTxId());
        txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
        if (domainEvent.getState() == TxState.COMMITTED) {
          txEntity.setState(domainEvent.getState());
        } else if (domainEvent.getState() == TxState.FAILED) {
          txEntity.setState(domainEvent.getState());
          txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
          data.getTxEntities().forEachReverse((k, v) -> {
            if (v.getState() == TxState.COMMITTED) {
              // call compensate
              if (!compensation(domainEvent, v, data)) {
                return;
              }
            }
          });
        } else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
          data.getCompensationRunningCounter().decrementAndGet();
          txEntity.setState(TxState.COMPENSATED_SUCCEED);
          LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
        } else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
          data.getCompensationRunningCounter().decrementAndGet();
          txEntity.setState(TxState.COMPENSATED_FAILED);
          txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
          if (txEntity.getReverseRetries() > 0 &&
              txEntity.getRetriesCounter().get() < txEntity.getReverseRetries()) {
            data.getTxEntities().forEachReverse((k, v) -> {
              if (v.getState() == TxState.COMMITTED || v.getState() == TxState.COMPENSATED_FAILED) {
                // call compensate
                if (!compensation(domainEvent, v, data)){
                  return;
                }
              }
            });
          } else {
            data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
            self().tell(ComponsitedCheckEvent.builder()
                .serviceName(txEntity.getServiceName())
                .instanceId(txEntity.getInstanceId())
                .globalTxId(txEntity.getGlobalTxId())
                .localTxId(txEntity.getLocalTxId())
                .preState(TxState.COMPENSATED_FAILED)
                .parentTxId(txEntity.getParentTxId()).build(), self());
          }
        }
      } else if (event instanceof SagaEndedDomain) {
        SagaEndedDomain domainEvent = (SagaEndedDomain) event;
        if (domainEvent.getState() == SagaActorState.FAILED) {
          data.getTxEntities().forEachReverse((k, v) -> {
            if (v.getState() == TxState.COMMITTED) {
              // call compensate
              if (!compensation(domainEvent, v, data)){
                return;
              }
            }
          });
        } else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
          data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
          data.setSuspendedType(domainEvent.getSuspendedType());
        } else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
          data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
        } else if (domainEvent.getState() == SagaActorState.COMMITTED) {
          data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
        }
      }
    }catch (Exception ex){
      LOG.error("apply {}", event.getEvent(), ex);
      LOG.error(ex.getMessage(), ex);
      beforeStop(event.getEvent(), SagaActorState.SUSPENDED, data);
      stop();
      //TODO 增加 SagaActor 处理失败指标
    }
    return data;
  }