public void onTxEvent()

in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcSagaEventService.java [100:201]


  public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
    if(LOG.isDebugEnabled()){
      LOG.debug("onText {}",message);
    }
    boolean ok = true;
    BaseEvent event = null;
    if (message.getType().equals(EventType.SagaStartedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .createTime(new Date())
          .timeout(message.getTimeout()).build();
    } else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .createTime(new Date())
          .globalTxId(message.getGlobalTxId()).build();
    } else if (message.getType().equals(EventType.SagaAbortedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .createTime(new Date())
          .payloads(message.getPayloads().toByteArray()).build();
    } else if (message.getType().equals(EventType.SagaTimeoutEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .createTime(new Date())
          .globalTxId(message.getGlobalTxId()).build();
    } else if (message.getType().equals(EventType.TxStartedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .localTxId(message.getLocalTxId())
          .parentTxId(message.getParentTxId().isEmpty() ? null : message.getParentTxId())
          .compensationMethod(message.getCompensationMethod())
          .retryMethod(message.getRetryMethod())
          .forwardRetries(message.getForwardRetries())
          .forwardTimeout(message.getForwardTimeout())
          .reverseRetries(message.getReverseRetries())
          .reverseTimeout(message.getReverseTimeout())
          .retryDelayInMilliseconds(message.getRetryDelayInMilliseconds())
          .createTime(new Date())
          .payloads(message.getPayloads().toByteArray()).build();
    } else if (message.getType().equals(EventType.TxEndedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .parentTxId(message.getParentTxId())
          .localTxId(message.getLocalTxId()).build();
    } else if (message.getType().equals(EventType.TxAbortedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .parentTxId(message.getParentTxId())
          .localTxId(message.getLocalTxId())
          .createTime(new Date())
          .payloads(message.getPayloads().toByteArray()).build();
    } else if (message.getType().equals(EventType.TxCompensatedEvent.name())) {
      event = org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensatedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .parentTxId(message.getParentTxId())
          .createTime(new Date())
          .localTxId(message.getLocalTxId()).build();
    } else if (message.getType().equals(EventType.TxCompensateAckSucceedEvent.name())) {
      event = TxCompensateAckSucceedEvent.builder()
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .parentTxId(message.getParentTxId())
          .createTime(new Date())
          .localTxId(message.getLocalTxId()).build();
      omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
          .getAck(CompensateAckType.Succeed);
    } else if (message.getType().equals(EventType.TxCompensateAckFailedEvent.name())) {
      event = TxCompensateAckFailedEvent.builder()
          .payloads(message.getPayloads().toByteArray())
          .serviceName(message.getServiceName())
          .instanceId(message.getInstanceId())
          .globalTxId(message.getGlobalTxId())
          .parentTxId(message.getParentTxId())
          .createTime(new Date())
          .localTxId(message.getLocalTxId()).build();
      omegaCallbacks.get(message.getServiceName()).get(message.getInstanceId())
          .getAck(CompensateAckType.Failed);
    } else {
      ok = false;
    }
    if (event != null) {
      actorEventChannel.send(event);
    }
    responseObserver.onNext(ok ? ALLOW : REJECT);
    responseObserver.onCompleted();
  }