private void serializeEvent()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java [150:249]


  private void serializeEvent(DataOutput out) throws IOException {
    if (event == null) {
      out.writeBoolean(false);
      return;
    }
    out.writeBoolean(true);
    out.writeInt(eventType.ordinal());
    out.writeLong(eventReceivedTime);
    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
      // TODO NEWTEZ convert to PB
      TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
      sEvt.write(out);
    } else {
      AbstractMessage message;
      switch (eventType) {
      case CUSTOM_PROCESSOR_EVENT:
        message =
            ProtoConverters.convertCustomProcessorEventToProto(
                (CustomProcessorEvent) event);
        break;
      case DATA_MOVEMENT_EVENT:
        message =
            ProtoConverters.convertDataMovementEventToProto(
                (DataMovementEvent) event);
        break;
      case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT:
          message =
            ProtoConverters.convertCompositeRoutedDataMovementEventToProto(
                (CompositeRoutedDataMovementEvent) event);
      break;
      case COMPOSITE_DATA_MOVEMENT_EVENT:
        message =
            ProtoConverters.convertCompositeDataMovementEventToProto(
                (CompositeDataMovementEvent) event);
        break;
      case VERTEX_MANAGER_EVENT:
        message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
        break;
      case INPUT_READ_ERROR_EVENT:
        InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
        message = InputReadErrorEventProto.newBuilder()
            .setIndex(ideEvt.getIndex())
            .setDiagnostics(ideEvt.getDiagnostics())
            .setVersion(ideEvt.getVersion())
            .setIsLocalFetch(ideEvt.isLocalFetch())
            .setIsDiskErrorAtSource(ideEvt.isDiskErrorAtSource())
            .setDestinationLocalhostName(ideEvt.getDestinationLocalhostName())
            .build();
        break;
      case TASK_ATTEMPT_FAILED_EVENT:
        TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
        message = TaskAttemptFailedEventProto.newBuilder()
            .setDiagnostics(tfEvt.getDiagnostics())
            .setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType()))
            .build();
        break;
        case TASK_ATTEMPT_KILLED_EVENT:
          TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event;
          message = TaskAttemptKilledEventProto.newBuilder()
              .setDiagnostics(tkEvent.getDiagnostics()).build();
          break;
      case TASK_ATTEMPT_COMPLETED_EVENT:
        message = TaskAttemptCompletedEventProto.newBuilder()
            .build();
        break;
      case INPUT_FAILED_EVENT:
        InputFailedEvent ifEvt = (InputFailedEvent) event;
        message = InputFailedEventProto.newBuilder()
            .setTargetIndex(ifEvt.getTargetIndex())
            .setVersion(ifEvt.getVersion()).build();
        break;
      case ROOT_INPUT_DATA_INFORMATION_EVENT:
        message = ProtoConverters.convertRootInputDataInformationEventToProto(
            (InputDataInformationEvent) event);
        break;
      case ROOT_INPUT_INITIALIZER_EVENT:
        message = ProtoConverters
            .convertRootInputInitializerEventToProto((InputInitializerEvent) event);
        break;
      default:
        throw new TezUncheckedException("Unknown TezEvent"
           + ", type=" + eventType);
      }
      if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
        int serializedSize = message.getSerializedSize();
        out.writeInt(serializedSize);
        int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
            : CodedOutputStream.DEFAULT_BUFFER_SIZE;
        CodedOutputStream codedOut = CodedOutputStream.newInstance(
            (OutputStream) out, buffersize);
        message.writeTo(codedOut);
        codedOut.flush();
      } else {
        byte[] eventBytes = message.toByteArray();
        out.writeInt(eventBytes.length);
        out.write(eventBytes);
      }

    }
  }