in tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java [150:249]
private void serializeEvent(DataOutput out) throws IOException {
if (event == null) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
out.writeInt(eventType.ordinal());
out.writeLong(eventReceivedTime);
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
sEvt.write(out);
} else {
AbstractMessage message;
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
message =
ProtoConverters.convertCustomProcessorEventToProto(
(CustomProcessorEvent) event);
break;
case DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertDataMovementEventToProto(
(DataMovementEvent) event);
break;
case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertCompositeRoutedDataMovementEventToProto(
(CompositeRoutedDataMovementEvent) event);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertCompositeDataMovementEventToProto(
(CompositeDataMovementEvent) event);
break;
case VERTEX_MANAGER_EVENT:
message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
message = InputReadErrorEventProto.newBuilder()
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.setVersion(ideEvt.getVersion())
.setIsLocalFetch(ideEvt.isLocalFetch())
.setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource())
.setDestinationLocalhostName(ideEvt.getDestinationLocalhostName())
.build();
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
message = TaskAttemptFailedEventProto.newBuilder()
.setDiagnostics(tfEvt.getDiagnostics())
.setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
.build();
break;
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
message = TaskAttemptKilledEventProto.newBuilder()
.setDiagnostics(tkEvent.getDiagnostics()).build();
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
message = TaskAttemptCompletedEventProto.newBuilder()
.build();
break;
case INPUT_FAILED_EVENT:
InputFailedEvent ifEvt = (InputFailedEvent) event;
message = InputFailedEventProto.newBuilder()
.setTargetIndex(ifEvt.getTargetIndex())
.setVersion(ifEvt.getVersion()).build();
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
message = ProtoConverters.convertRootInputDataInformationEventToProto(
(InputDataInformationEvent) event);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
message = ProtoConverters
.convertRootInputInitializerEventToProto((InputInitializerEvent) event);
break;
default:
throw new TezUncheckedException("Unknown TezEvent"
+ ", type=" + eventType);
}
if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
int serializedSize = message.getSerializedSize();
out.writeInt(serializedSize);
int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
: CodedOutputStream.DEFAULT_BUFFER_SIZE;
CodedOutputStream codedOut = CodedOutputStream.newInstance(
(OutputStream) out, buffersize);
message.writeTo(codedOut);
codedOut.flush();
} else {
byte[] eventBytes = message.toByteArray();
out.writeInt(eventBytes.length);
out.write(eventBytes);
}
}
}