OperatorImpl createOperatorImpl()

in samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java [223:261]


  OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, Context context) {
    Config config = context.getJobContext().getConfig();
    StreamConfig streamConfig = new StreamConfig(config);
    if (operatorSpec instanceof InputOperatorSpec) {
      return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
    } else if (operatorSpec instanceof StreamOperatorSpec) {
      return new FlatmapOperatorImpl((StreamOperatorSpec) operatorSpec);
    } else if (operatorSpec instanceof SinkOperatorSpec) {
      return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec);
    } else if (operatorSpec instanceof OutputOperatorSpec) {
      String streamId = ((OutputOperatorSpec) operatorSpec).getOutputStream().getStreamId();
      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, systemStream);
    } else if (operatorSpec instanceof PartitionByOperatorSpec) {
      String streamId = ((PartitionByOperatorSpec) operatorSpec).getOutputStream().getStreamId();
      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream,
              internalTaskContext);
    } else if (operatorSpec instanceof WindowOperatorSpec) {
      return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
    } else if (operatorSpec instanceof JoinOperatorSpec) {
      return getOrCreatePartialJoinOpImpls((JoinOperatorSpec) operatorSpec,
          prevOperatorSpec.equals(((JoinOperatorSpec) operatorSpec).getLeftInputOpSpec()), clock);
    } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
      return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, context);
    } else if (operatorSpec instanceof SendToTableOperatorSpec) {
      return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, context);
    } else if (operatorSpec instanceof SendToTableWithUpdateOperatorSpec) {
      return new SendToTableWithUpdateOperatorImpl((SendToTableWithUpdateOperatorSpec) operatorSpec, context);
    } else if (operatorSpec instanceof BroadcastOperatorSpec) {
      String streamId = ((BroadcastOperatorSpec) operatorSpec).getOutputStream().getStreamId();
      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
      return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, systemStream, context);
    } else if (operatorSpec instanceof AsyncFlatMapOperatorSpec) {
      return new AsyncFlatmapOperatorImpl((AsyncFlatMapOperatorSpec) operatorSpec);
    }
    throw new IllegalArgumentException(
        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
  }