in tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java [117:190]
private void serializeEvent(DataOutput out) throws IOException {
if (event == null) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
out.writeInt(eventType.ordinal());
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
sEvt.write(out);
} else {
byte[] eventBytes = null;
switch (eventType) {
case DATA_MOVEMENT_EVENT:
eventBytes =
ProtoConverters.convertDataMovementEventToProto(
(DataMovementEvent) event).toByteArray();
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
eventBytes =
ProtoConverters.convertCompositeDataMovementEventToProto(
(CompositeDataMovementEvent) event).toByteArray();
break;
case VERTEX_MANAGER_EVENT:
VertexManagerEvent vmEvt = (VertexManagerEvent) event;
VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
vmBuilder.setTargetVertexName(vmEvt.getTargetVertexName());
if (vmEvt.getUserPayload() != null) {
vmBuilder.setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()));
}
eventBytes = vmBuilder.build().toByteArray();
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
eventBytes = InputReadErrorEventProto.newBuilder()
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.setVersion(ideEvt.getVersion())
.build().toByteArray();
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
eventBytes = TaskAttemptFailedEventProto.newBuilder()
.setDiagnostics(tfEvt.getDiagnostics())
.build().toByteArray();
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
eventBytes = TaskAttemptCompletedEventProto.newBuilder()
.build().toByteArray();
break;
case INPUT_FAILED_EVENT:
InputFailedEvent ifEvt = (InputFailedEvent) event;
eventBytes = InputFailedEventProto.newBuilder()
.setTargetIndex(ifEvt.getTargetIndex())
.setVersion(ifEvt.getVersion()).build().toByteArray();
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
(RootInputDataInformationEvent) event).toByteArray();
break;
case ROOT_INPUT_INITIALIZER_EVENT:
eventBytes = ProtoConverters
.convertRootInputInitializerEventToProto((RootInputInitializerEvent) event)
.toByteArray();
break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
}
out.writeInt(eventBytes.length);
out.write(eventBytes);
}
}