in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcSagaEventService.java [100:201]
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
if(LOG.isDebugEnabled()){
LOG.debug("onText {}",message);
}
boolean ok = true;
BaseEvent event = null;
if (message.getType().equals(EventType.SagaStartedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.createTime(new Date())
.timeout(message.getTimeout()).build();
} else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.createTime(new Date())
.globalTxId(message.getGlobalTxId()).build();
} else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.SagaTimeoutEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.createTime(new Date())
.globalTxId(message.getGlobalTxId()).build();
} else if (message.getType().equals(EventType.TxStartedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.localTxId(message.getLocalTxId())
.parentTxId(message.getParentTxId().isEmpty() ? null : message.getParentTxId())
.compensationMethod(message.getCompensationMethod())
.retryMethod(message.getRetryMethod())
.forwardRetries(message.getForwardRetries())
.forwardTimeout(message.getForwardTimeout())
.reverseRetries(message.getReverseRetries())
.reverseTimeout(message.getReverseTimeout())
.retryDelayInMilliseconds(message.getRetryDelayInMilliseconds())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.TxEndedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.parentTxId(message.getParentTxId())
.localTxId(message.getLocalTxId()).build();
} else if (message.getType().equals(EventType.TxAbortedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.parentTxId(message.getParentTxId())
.localTxId(message.getLocalTxId())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.TxCompensatedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.parentTxId(message.getParentTxId())
.createTime(new Date())
.localTxId(message.getLocalTxId()).build();
} else if (message.getType().equals(EventType.TxCompensateAckSucceedEvent.name())) {
event = TxCompensateAckSucceedEvent.builder()
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.parentTxId(message.getParentTxId())
.createTime(new Date())
.localTxId(message.getLocalTxId()).build();
omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
.getAck(CompensateAckType.Succeed);
} else if (message.getType().equals(EventType.TxCompensateAckFailedEvent.name())) {
event = TxCompensateAckFailedEvent.builder()
.payloads(message.getPayloads().toByteArray())
.serviceName(message.getServiceName())
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.parentTxId(message.getParentTxId())
.createTime(new Date())
.localTxId(message.getLocalTxId()).build();
omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
.getAck(CompensateAckType.Failed);
} else {
ok = false;
}
if (event != null) {
actorEventChannel.send(event);
}
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
}