in sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java [194:397]
public void process(ProcessContext c, BoundedWindow w, OutputReceiver<OutputT> outputReceiver) {
WatermarkEstimatorStateT initialWatermarkEstimatorState =
(WatermarkEstimatorStateT)
invoker.invokeGetInitialWatermarkEstimatorState(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public Object restriction() {
return c.element().getValue();
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public Object sideInput(String tagId) {
PCollectionView<?> view = sideInputMapping.get(tagId);
if (view == null) {
throw new IllegalArgumentException(
"calling getSideInput() with unknown view");
}
return c.sideInput(view);
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName()
+ ".invokeGetInitialWatermarkEstimatorState";
}
});
RestrictionT restriction = c.element().getValue();
WatermarkEstimatorStateT watermarkEstimatorState = initialWatermarkEstimatorState;
while (true) {
RestrictionT currentRestriction = restriction;
WatermarkEstimatorStateT currentWatermarkEstimatorState = watermarkEstimatorState;
RestrictionTracker<RestrictionT, PositionT> tracker =
RestrictionTrackers.observe(
invoker.invokeNewTracker(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public RestrictionT restriction() {
return currentRestriction;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public Object sideInput(String tagId) {
PCollectionView<?> view = sideInputMapping.get(tagId);
if (view == null) {
throw new IllegalArgumentException(
"calling getSideInput() with unknown view");
}
return c.sideInput(view);
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
}
}),
new ClaimObserver<PositionT>() {
@Override
public void onClaimed(PositionT position) {}
@Override
public void onClaimFailed(PositionT position) {}
});
WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
invoker.invokeNewWatermarkEstimator(
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return c.element().getKey();
}
@Override
public RestrictionT restriction() {
return currentRestriction;
}
@Override
public WatermarkEstimatorStateT watermarkEstimatorState() {
return currentWatermarkEstimatorState;
}
@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return c.timestamp();
}
@Override
public PipelineOptions pipelineOptions() {
return c.getPipelineOptions();
}
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return c.pane();
}
@Override
public BoundedWindow window() {
return w;
}
@Override
public Object sideInput(String tagId) {
PCollectionView<?> view = sideInputMapping.get(tagId);
if (view == null) {
throw new IllegalArgumentException(
"calling getSideInput() with unknown view");
}
return c.sideInput(view);
}
@Override
public String getErrorContext() {
return NaiveProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
}
});
OutputBuilderSupplier outputBuilderSupplier =
new OutputBuilderSupplier() {
@Override
public <X> WindowedValues.Builder<X> builder(X value) {
return WindowedValues.builder(outputReceiver.builder(null)).withValue(value);
}
};
ProcessContinuation continuation =
invoker.invokeProcessElement(
new NestedProcessContext<>(
fn,
c,
outputBuilderSupplier,
c.element().getKey(),
w,
tracker,
watermarkEstimator,
sideInputMapping));
if (continuation.shouldResume()) {
// Fetch the watermark before splitting to ensure that the watermark applies to both
// the primary and the residual.
watermarkEstimatorState = watermarkEstimator.getState();
SplitResult<RestrictionT> split = tracker.trySplit(0);
if (split == null) {
break;
}
restriction = split.getResidual();
Uninterruptibles.sleepUninterruptibly(
continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
} else {
break;
}
}
}