public SagaActor()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [72:397]


  public SagaActor(String persistenceId) {
    if (persistenceId != null) {
      this.persistenceId = persistenceId;
    } else {
      this.persistenceId = getSelf().path().name();
    }

    startWith(SagaActorState.IDLE, SagaData.builder().build());

    when(SagaActorState.IDLE,
        matchEvent(SagaStartedEvent.class,
            (event, data) -> {
              sagaBeginTime = System.currentTimeMillis();
              SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
              SagaStartedDomain domainEvent = new SagaStartedDomain(event);
              if (event.getTimeout() > 0) {
                data.setTimeout(event.getTimeout());
                return goTo(SagaActorState.READY)
                    .applying(domainEvent)
                    .forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));
              } else {
                return goTo(SagaActorState.READY)
                    .applying(domainEvent);
              }
            }

        )
    );

    when(SagaActorState.READY,
        matchEvent(TxStartedEvent.class, SagaData.class,
            (event, data) -> {
              AddTxEventDomain domainEvent = new AddTxEventDomain(event);
              if (data.getExpirationTime() != null) {
                return goTo(SagaActorState.PARTIALLY_ACTIVE)
                    .applying(domainEvent)
                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
              } else {
                return goTo(SagaActorState.PARTIALLY_ACTIVE)
                    .applying(domainEvent);
              }
            }
        ).event(SagaEndedEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            }
        ).event(SagaAbortedEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            }
        ).event(Collections.singletonList(StateTimeout()), SagaData.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            })
    );

    when(SagaActorState.PARTIALLY_ACTIVE,
        matchEvent(TxEndedEvent.class, SagaData.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              if (data.getExpirationTime() != null) {
                return goTo(SagaActorState.PARTIALLY_COMMITTED)
                    .applying(domainEvent)
                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
              } else {
                return goTo(SagaActorState.PARTIALLY_COMMITTED)
                    .applying(domainEvent);
              }
            }
        ).event(TxStartedEvent.class,
            (event, data) -> {
              AddTxEventDomain domainEvent = new AddTxEventDomain(event);
              if (data.getExpirationTime() != null) {
                return stay()
                    .applying(domainEvent)
                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
              } else {
                return stay().applying(domainEvent);
              }
            }
        ).event(SagaTimeoutEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED,
                  SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            }
        ).event(TxAbortedEvent.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return goTo(SagaActorState.FAILED)
                  .applying(domainEvent);
            }
        ).event(Collections.singletonList(StateTimeout()), SagaData.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
            })
    );

    when(SagaActorState.PARTIALLY_COMMITTED,
        matchEvent(TxStartedEvent.class,
            (event, data) -> {
              AddTxEventDomain domainEvent = new AddTxEventDomain(event);
              if (data.getExpirationTime() != null) {
                return goTo(SagaActorState.PARTIALLY_ACTIVE)
                    .applying(domainEvent)
                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
              } else {
                return goTo(SagaActorState.PARTIALLY_ACTIVE)
                    .applying(domainEvent);
              }
            }
        ).event(TxEndedEvent.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              if (data.getExpirationTime() != null) {
                return stay()
                    .applying(domainEvent)
                    .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
              } else {
                return stay().applying(domainEvent);
              }
            }
        ).event(SagaTimeoutEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            }
        ).event(SagaEndedEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
              return goTo(SagaActorState.COMMITTED)
                  .applying(domainEvent);
            }
        ).event(SagaAbortedEvent.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
              return goTo(SagaActorState.FAILED).applying(domainEvent);
            }
        ).event(TxAbortedEvent.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return goTo(SagaActorState.FAILED).applying(domainEvent);
            }
        ).event(Collections.singletonList(StateTimeout()), SagaData.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
            })
    );

    when(SagaActorState.FAILED,
        matchEvent(SagaTimeoutEvent.class, SagaData.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            }
        ).event(TxCompensateAckSucceedEvent.class, SagaData.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return stay().applying(domainEvent).andThen(exec(_data -> {
                self().tell(ComponsitedCheckEvent.builder()
                    .serviceName(event.getServiceName())
                    .instanceId(event.getInstanceId())
                    .globalTxId(event.getGlobalTxId())
                    .localTxId(event.getLocalTxId())
                    .parentTxId(event.getParentTxId())
                    .preState(TxState.COMPENSATED_SUCCEED)
                    .build(), self());
              }));
            }
        ).event(TxCompensateAckFailedEvent.class, SagaData.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return stay().applying(domainEvent).andThen(exec(_data -> {
                self().tell(ComponsitedCheckEvent.builder()
                    .serviceName(event.getServiceName())
                    .instanceId(event.getInstanceId())
                    .globalTxId(event.getGlobalTxId())
                    .localTxId(event.getLocalTxId())
                    .parentTxId(event.getParentTxId())
                    .preState(TxState.COMPENSATED_FAILED)
                    .build(), self());
              }));
            }
        ).event(CompensateAckTimeoutEvent.class, SagaData.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return stay().applying(domainEvent).andThen(exec(_data -> {
                self().tell(ComponsitedCheckEvent.builder()
                    .serviceName(event.getServiceName())
                    .instanceId(event.getInstanceId())
                    .globalTxId(event.getGlobalTxId())
                    .localTxId(event.getLocalTxId())
                    .parentTxId(event.getParentTxId())
                    .preState(TxState.COMPENSATED_FAILED)
                    .build(), self());
              }));
            }
        ).event(ComponsitedCheckEvent.class, SagaData.class,
            (event, data) -> {
              if (data.getTxEntities().hasCompensationSentTx() ||
                  data.getTxEntities().hasCompensationFailedTx()) {
                UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
                return stay().applying(domainEvent);
              } else {
                if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
                  SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
                  return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
                } else {
                  SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
                  return goTo(SagaActorState.COMPENSATED).applying(domainEvent);
                }
              }
            }
        ).event(SagaAbortedEvent.class, SagaData.class,
            (event, data) -> {
              if (data.getTxEntities().hasCommittedTx()) {
                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
                return stay()
                    .applying(domainEvent);
              } else if (data.getTxEntities().hasCompensationSentTx()) {
                SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
                return stay()
                    .applying(domainEvent);
              } else {
                SagaEndedDomain domainEvent = new SagaEndedDomain(event,
                    SagaActorState.COMPENSATED);
                return goTo(SagaActorState.COMPENSATED)
                    .applying(domainEvent);
              }
            }
        ).event(TxStartedEvent.class, SagaData.class,
            (event, data) -> {
              AddTxEventDomain domainEvent = new AddTxEventDomain(event);
              return stay().applying(domainEvent);
            }
        ).event(TxEndedEvent.class, SagaData.class,
            (event, data) -> {
              UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
              return stay().applying(domainEvent).andThen(exec(_data -> {
                TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
                // call compensate
                compensation(domainEvent, txEntity, _data);
              }));
            }
        ).event(Arrays.asList(StateTimeout()), SagaData.class,
            (event, data) -> {
              SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
              return goTo(SagaActorState.SUSPENDED)
                  .applying(domainEvent);
            })
    );

    when(SagaActorState.COMMITTED,
        matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
            (event, data) -> {
              beforeStop(event, stateName(), data);
              return stop();
            }
        )
    );

    when(SagaActorState.SUSPENDED,
        matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
            (event, data) -> {
              beforeStop(event, stateName(), data);
              return stop();
            }
        )
    );

    when(SagaActorState.COMPENSATED,
        matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
            (event, data) -> {
              beforeStop(event, stateName(), data);
              return stop();
            }
        )
    );

    whenUnhandled(
        matchAnyEvent((event, data) -> {
          if (event instanceof BaseEvent){
            LOG.debug("Unhandled event {}", event);
          }
          return stay();
        })
    );

    onTransition(
        matchState(null, null, (from, to) -> {
          if (stateData().getGlobalTxId() != null) {
            stateData().setLastState(to);
            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
                .putSagaData(stateData().getGlobalTxId(), stateData());
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), from, to);
          }
          if (to == SagaActorState.COMMITTED ||
              to == SagaActorState.SUSPENDED ||
              to == SagaActorState.COMPENSATED) {
            self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self());
          }
        })
    );

    onTermination(
        matchStop(
            Normal(), (state, data) -> {
              LOG.info("stopped [{}] {}", data.getGlobalTxId(), state);
            }
        )
    );

  }