private void deserializeEvent()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java [251:345]


  private void deserializeEvent(DataInput in) throws IOException {
    if (!in.readBoolean()) {
      event = null;
      return;
    }
    eventType = EventType.values()[in.readInt()];
    eventReceivedTime = in.readLong();
    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
      // TODO NEWTEZ convert to PB
      event = new TaskStatusUpdateEvent();
      ((TaskStatusUpdateEvent)event).readFields(in);
    } else {
      int eventBytesLen = in.readInt();
      byte[] eventBytes;
      CodedInputStream input;
      int startOffset = 0;
      if (in instanceof DataInputBuffer) {
        eventBytes = ((DataInputBuffer)in).getData();
        startOffset = ((DataInputBuffer) in).getPosition();
      } else {
        eventBytes = new byte[eventBytesLen];
        in.readFully(eventBytes);
      }
      input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
      switch (eventType) {
      case CUSTOM_PROCESSOR_EVENT:
        CustomProcessorEventProto cpProto =
            CustomProcessorEventProto.parseFrom(input);
        event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto);
        break;
      case DATA_MOVEMENT_EVENT:
        DataMovementEventProto dmProto =
            DataMovementEventProto.parseFrom(input);
        event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
        break;
      case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
        CompositeRoutedDataMovementEventProto edmProto =
            CompositeRoutedDataMovementEventProto.parseFrom(eventBytes);
      event = ProtoConverters.convertCompositeRoutedDataMovementEventFromProto(edmProto);
      break;
      case COMPOSITE_DATA_MOVEMENT_EVENT:
        CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
        event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
        break;
      case VERTEX_MANAGER_EVENT:
        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
        event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
        break;
      case INPUT_READ_ERROR_EVENT:
        InputReadErrorEventProto ideProto = InputReadErrorEventProto.parseFrom(input);
        event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(),
            ideProto.getVersion(), ideProto.getIsLocalFetch(), ideProto.getIsDiskErrorAtSource(),
            ideProto.getDestinationLocalhostName());
        break;
      case TASK_ATTEMPT_FAILED_EVENT:
        TaskAttemptFailedEventProto tfProto =
            TaskAttemptFailedEventProto.parseFrom(input);
        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(),
            TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType()));
        break;
      case TASK_ATTEMPT_KILLED_EVENT:
        TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input);
        event = new TaskAttemptKilledEvent(tkProto.getDiagnostics());
        break;
      case TASK_ATTEMPT_COMPLETED_EVENT:
        event = new TaskAttemptCompletedEvent();
        break;
      case INPUT_FAILED_EVENT:
        InputFailedEventProto ifProto =
            InputFailedEventProto.parseFrom(input);
        event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
        break;
      case ROOT_INPUT_DATA_INFORMATION_EVENT:
        RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
            .parseFrom(input);
        event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
        break;
      case ROOT_INPUT_INITIALIZER_EVENT:
        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
        event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
        break;
      default:
        // RootInputUpdatePayload event not wrapped in a TezEvent.
        throw new TezUncheckedException("Unexpected TezEvent"
           + ", type=" + eventType);
      }
      if (in instanceof DataInputBuffer) {
        // Skip so that position is updated
        int skipped = in.skipBytes(eventBytesLen);
        if (skipped != eventBytesLen) {
          throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped);
        }
      }
    }
  }