in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [467:581]
public SagaData applyEvent(DomainEvent event, SagaData data) {
LOG.debug("apply domain event {}", event.getEvent());
try{
if (this.recoveryRunning()) {
LOG.info("recovery {}",event.getEvent());
}else if (LOG.isDebugEnabled()) {
LOG.debug("persistence {}", event.getEvent());
}
if (event.getEvent() != null && !(event
.getEvent() instanceof ComponsitedCheckEvent)) {
data.logEvent(event.getEvent());
}
if (event instanceof SagaStartedDomain) {
SagaStartedDomain domainEvent = (SagaStartedDomain) event;
data.setServiceName(domainEvent.getEvent().getServiceName());
data.setInstanceId(domainEvent.getEvent().getInstanceId());
data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId());
data.setBeginTime(domainEvent.getEvent().getCreateTime());
data.setExpirationTime(domainEvent.getExpirationTime());
} else if (event instanceof AddTxEventDomain) {
AddTxEventDomain domainEvent = (AddTxEventDomain) event;
if (!data.getTxEntities().exists(domainEvent.getEvent().getLocalTxId())) {
TxEntity txEntity = TxEntity.builder()
.serviceName(domainEvent.getEvent().getServiceName())
.instanceId(domainEvent.getEvent().getInstanceId())
.globalTxId(domainEvent.getEvent().getGlobalTxId())
.localTxId(domainEvent.getEvent().getLocalTxId())
.parentTxId(domainEvent.getEvent().getParentTxId())
.compensationMethod(domainEvent.getCompensationMethod())
.payloads(domainEvent.getPayloads())
.state(domainEvent.getState())
.reverseRetries(domainEvent.getReverseRetries())
.reverseTimeout(domainEvent.getReverseTimeout())
.retryDelayInMilliseconds(domainEvent.getRetryDelayInMilliseconds())
.beginTime(domainEvent.getEvent().getCreateTime())
.build();
data.getTxEntities().put(txEntity.getLocalTxId(), txEntity);
} else {
LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId());
}
} else if (event instanceof UpdateTxEventDomain) {
UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event;
TxEntity txEntity = data.getTxEntities().get(domainEvent.getLocalTxId());
txEntity.setEndTime(domainEvent.getEvent().getCreateTime());
if (domainEvent.getState() == TxState.COMMITTED) {
txEntity.setState(domainEvent.getState());
} else if (domainEvent.getState() == TxState.FAILED) {
txEntity.setState(domainEvent.getState());
txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
if (!compensation(domainEvent, v, data)) {
return;
}
}
});
} else if (domainEvent.getState() == TxState.COMPENSATED_SUCCEED) {
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(TxState.COMPENSATED_SUCCEED);
LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
} else if (domainEvent.getState() == TxState.COMPENSATED_FAILED) {
data.getCompensationRunningCounter().decrementAndGet();
txEntity.setState(TxState.COMPENSATED_FAILED);
txEntity.setThrowablePayLoads(domainEvent.getThrowablePayLoads());
if (txEntity.getReverseRetries() > 0 &&
txEntity.getRetriesCounter().get() < txEntity.getReverseRetries()) {
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED || v.getState() == TxState.COMPENSATED_FAILED) {
// call compensate
if (!compensation(domainEvent, v, data)){
return;
}
}
});
} else {
data.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
self().tell(ComponsitedCheckEvent.builder()
.serviceName(txEntity.getServiceName())
.instanceId(txEntity.getInstanceId())
.globalTxId(txEntity.getGlobalTxId())
.localTxId(txEntity.getLocalTxId())
.preState(TxState.COMPENSATED_FAILED)
.parentTxId(txEntity.getParentTxId()).build(), self());
}
}
} else if (event instanceof SagaEndedDomain) {
SagaEndedDomain domainEvent = (SagaEndedDomain) event;
if (domainEvent.getState() == SagaActorState.FAILED) {
data.getTxEntities().forEachReverse((k, v) -> {
if (v.getState() == TxState.COMMITTED) {
// call compensate
if (!compensation(domainEvent, v, data)){
return;
}
}
});
} else if (domainEvent.getState() == SagaActorState.SUSPENDED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
data.setSuspendedType(domainEvent.getSuspendedType());
} else if (domainEvent.getState() == SagaActorState.COMPENSATED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
} else if (domainEvent.getState() == SagaActorState.COMMITTED) {
data.setEndTime(event.getEvent() != null ? event.getEvent().getCreateTime() : new Date());
}
}
}catch (Exception ex){
LOG.error("apply {}", event.getEvent(), ex);
LOG.error(ex.getMessage(), ex);
beforeStop(event.getEvent(), SagaActorState.SUSPENDED, data);
stop();
//TODO 增加 SagaActor 处理失败指标
}
return data;
}