in tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java [251:345]
private void deserializeEvent(DataInput in) throws IOException {
if (!in.readBoolean()) {
event = null;
return;
}
eventType = EventType.values()[in.readInt()];
eventReceivedTime = in.readLong();
if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
// TODO NEWTEZ convert to PB
event = new TaskStatusUpdateEvent();
((TaskStatusUpdateEvent)event).readFields(in);
} else {
int eventBytesLen = in.readInt();
byte[] eventBytes;
CodedInputStream input;
int startOffset = 0;
if (in instanceof DataInputBuffer) {
eventBytes = ((DataInputBuffer)in).getData();
startOffset = ((DataInputBuffer) in).getPosition();
} else {
eventBytes = new byte[eventBytesLen];
in.readFully(eventBytes);
}
input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
CustomProcessorEventProto cpProto =
CustomProcessorEventProto.parseFrom(input);
event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto);
break;
case DATA_MOVEMENT_EVENT:
DataMovementEventProto dmProto =
DataMovementEventProto.parseFrom(input);
event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
break;
case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
CompositeRoutedDataMovementEventProto edmProto =
CompositeRoutedDataMovementEventProto.parseFrom(eventBytes);
event = ProtoConverters.convertCompositeRoutedDataMovementEventFromProto(edmProto);
break;
case COMPOSITE_DATA_MOVEMENT_EVENT:
CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
break;
case VERTEX_MANAGER_EVENT:
VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input);
event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(),
ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource(),
ideProto.getDestinationLocalhostName());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEventProto tfProto =
TaskAttemptFailedEventProto.parseFrom(input);
event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
break;
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input);
event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
event = new TaskAttemptCompletedEvent();
break;
case INPUT_FAILED_EVENT:
InputFailedEventProto ifProto =
InputFailedEventProto.parseFrom(input);
event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
.parseFrom(input);
event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
break;
case ROOT_INPUT_INITIALIZER_EVENT:
EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
break;
default:
// RootInputUpdatePayload event not wrapped in a TezEvent.
throw new TezUncheckedException("Unexpected TezEvent"
+ ", type=" + eventType);
}
if (in instanceof DataInputBuffer) {
// Skip so that position is updated
int skipped = in.skipBytes(eventBytesLen);
if (skipped != eventBytesLen) {
throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped);
}
}
}
}