in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/GrpcOmegaCallback.java [43:68]
public void compensate(TxEvent event) {
compensateAckCountDownLatch = new CompensateAckCountDownLatch(1);
try {
GrpcCompensateCommand command = GrpcCompensateCommand.newBuilder()
.setGlobalTxId(event.globalTxId())
.setLocalTxId(event.localTxId())
.setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
.setCompensationMethod(event.compensationMethod())
.setPayloads(ByteString.copyFrom(event.payloads()))
.build();
observer.onNext(command);
compensateAckCountDownLatch.await();
if (compensateAckCountDownLatch.getType() == CompensateAckType.Disconnected) {
throw new CompensateConnectException("Omega connect exception");
}else{
LOG.debug("compensate ack "+ compensateAckCountDownLatch.getType().name());
if(compensateAckCountDownLatch.getType() == CompensateAckType.Failed){
throw new CompensateAckFailedException("An exception is thrown inside the compensation method");
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
compensateAckCountDownLatch = null;
}
}