in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [72:397]
public SagaActor(String persistenceId) {
if (persistenceId != null) {
this.persistenceId = persistenceId;
} else {
this.persistenceId = getSelf().path().name();
}
startWith(SagaActorState.IDLE, SagaData.builder().build());
when(SagaActorState.IDLE,
matchEvent(SagaStartedEvent.class,
(event, data) -> {
sagaBeginTime = System.currentTimeMillis();
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
SagaStartedDomain domainEvent = new SagaStartedDomain(event);
if (event.getTimeout() > 0) {
data.setTimeout(event.getTimeout());
return goTo(SagaActorState.READY)
.applying(domainEvent)
.forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));
} else {
return goTo(SagaActorState.READY)
.applying(domainEvent);
}
}
)
);
when(SagaActorState.READY,
matchEvent(TxStartedEvent.class, SagaData.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent);
}
}
).event(SagaEndedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
})
);
when(SagaActorState.PARTIALLY_ACTIVE,
matchEvent(TxEndedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_COMMITTED)
.applying(domainEvent);
}
}
).event(TxStartedEvent.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED,
SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED)
.applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
})
);
when(SagaActorState.PARTIALLY_COMMITTED,
matchEvent(TxStartedEvent.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
if (data.getExpirationTime() != null) {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return goTo(SagaActorState.PARTIALLY_ACTIVE)
.applying(domainEvent);
}
}
).event(TxEndedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
if (data.getExpirationTime() != null) {
return stay()
.applying(domainEvent)
.forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS));
} else {
return stay().applying(domainEvent);
}
}
).event(SagaTimeoutEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(SagaEndedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED);
return goTo(SagaActorState.COMMITTED)
.applying(domainEvent);
}
).event(SagaAbortedEvent.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(TxAbortedEvent.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return goTo(SagaActorState.FAILED).applying(domainEvent);
}
).event(Collections.singletonList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
})
);
when(SagaActorState.FAILED,
matchEvent(SagaTimeoutEvent.class, SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
}
).event(TxCompensateAckSucceedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder()
.serviceName(event.getServiceName())
.instanceId(event.getInstanceId())
.globalTxId(event.getGlobalTxId())
.localTxId(event.getLocalTxId())
.parentTxId(event.getParentTxId())
.preState(TxState.COMPENSATED_SUCCEED)
.build(), self());
}));
}
).event(TxCompensateAckFailedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder()
.serviceName(event.getServiceName())
.instanceId(event.getInstanceId())
.globalTxId(event.getGlobalTxId())
.localTxId(event.getLocalTxId())
.parentTxId(event.getParentTxId())
.preState(TxState.COMPENSATED_FAILED)
.build(), self());
}));
}
).event(CompensateAckTimeoutEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
self().tell(ComponsitedCheckEvent.builder()
.serviceName(event.getServiceName())
.instanceId(event.getInstanceId())
.globalTxId(event.getGlobalTxId())
.localTxId(event.getLocalTxId())
.parentTxId(event.getParentTxId())
.preState(TxState.COMPENSATED_FAILED)
.build(), self());
}));
}
).event(ComponsitedCheckEvent.class, SagaData.class,
(event, data) -> {
if (data.getTxEntities().hasCompensationSentTx() ||
data.getTxEntities().hasCompensationFailedTx()) {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent);
} else {
if(data.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED);
return goTo(SagaActorState.SUSPENDED).applying(domainEvent);
} else {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED).applying(domainEvent);
}
}
}
).event(SagaAbortedEvent.class, SagaData.class,
(event, data) -> {
if (data.getTxEntities().hasCommittedTx()) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return stay()
.applying(domainEvent);
} else if (data.getTxEntities().hasCompensationSentTx()) {
SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED);
return stay()
.applying(domainEvent);
} else {
SagaEndedDomain domainEvent = new SagaEndedDomain(event,
SagaActorState.COMPENSATED);
return goTo(SagaActorState.COMPENSATED)
.applying(domainEvent);
}
}
).event(TxStartedEvent.class, SagaData.class,
(event, data) -> {
AddTxEventDomain domainEvent = new AddTxEventDomain(event);
return stay().applying(domainEvent);
}
).event(TxEndedEvent.class, SagaData.class,
(event, data) -> {
UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event);
return stay().applying(domainEvent).andThen(exec(_data -> {
TxEntity txEntity = _data.getTxEntities().get(event.getLocalTxId());
// call compensate
compensation(domainEvent, txEntity, _data);
}));
}
).event(Arrays.asList(StateTimeout()), SagaData.class,
(event, data) -> {
SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED, SuspendedType.TIMEOUT);
return goTo(SagaActorState.SUSPENDED)
.applying(domainEvent);
})
);
when(SagaActorState.COMMITTED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
when(SagaActorState.SUSPENDED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
when(SagaActorState.COMPENSATED,
matchEvent(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.class,
(event, data) -> {
beforeStop(event, stateName(), data);
return stop();
}
)
);
whenUnhandled(
matchAnyEvent((event, data) -> {
if (event instanceof BaseEvent){
LOG.debug("Unhandled event {}", event);
}
return stay();
})
);
onTransition(
matchState(null, null, (from, to) -> {
if (stateData().getGlobalTxId() != null) {
stateData().setLastState(to);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.putSagaData(stateData().getGlobalTxId(), stateData());
}
if (LOG.isDebugEnabled()) {
LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), from, to);
}
if (to == SagaActorState.COMMITTED ||
to == SagaActorState.SUSPENDED ||
to == SagaActorState.COMPENSATED) {
self().tell(org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent.builder().build(), self());
}
})
);
onTermination(
matchStop(
Normal(), (state, data) -> {
LOG.info("stopped [{}] {}", data.getGlobalTxId(), state);
}
)
);
}