in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [504:773]
public void runFetcher() throws Exception {
// check that we are running before proceeding
if (!running) {
return;
}
// ------------------------------------------------------------------------
// Procedures before starting the infinite while loop:
// ------------------------------------------------------------------------
// 1. check that there is at least one shard in the subscribed streams to consume from (can
// be done by
// checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not
// null)
boolean hasShards = false;
StringBuilder streamsWithNoShardsFound = new StringBuilder();
for (Map.Entry<String, String> streamToLastDiscoveredShardEntry :
subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
if (streamToLastDiscoveredShardEntry.getValue() != null) {
hasShards = true;
} else {
streamsWithNoShardsFound
.append(streamToLastDiscoveredShardEntry.getKey())
.append(", ");
}
}
if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
LOG.warn(
"Subtask {} has failed to find any shards for the following subscribed streams: {}",
indexOfThisConsumerSubtask,
streamsWithNoShardsFound.toString());
}
if (!hasShards) {
throw new RuntimeException(
"No shards can be found for all subscribed streams: " + streams);
}
// 2. start consuming any shard state we already have in the subscribedShardState up to
// this point; the
// subscribedShardState may already be seeded with values due to step 1., or explicitly
// added by the
// consumer using a restored state checkpoint
for (int seededStateIndex = 0;
seededStateIndex < subscribedShardsState.size();
seededStateIndex++) {
KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
// only start a consuming thread if the seeded subscribed shard has not been completely
// read already
if (!seededShardState
.getLastProcessedSequenceNum()
.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask,
seededShardState.getStreamShardHandle().toString(),
seededShardState.getLastProcessedSequenceNum(),
seededStateIndex);
}
StreamShardHandle streamShardHandle =
subscribedShardsState.get(seededStateIndex).getStreamShardHandle();
KinesisDeserializationSchema<T> shardDeserializationSchema =
getClonedDeserializationSchema();
shardDeserializationSchema.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
runtimeContext,
// ignore the provided metric group
metricGroup ->
consumerMetricGroup
.addGroup(
"subtaskId",
String.valueOf(indexOfThisConsumerSubtask))
.addGroup(
"shardId",
streamShardHandle.getShard().getShardId())
.addGroup("user")));
shardConsumersExecutor.submit(
createShardConsumer(
seededStateIndex,
streamShardHandle,
subscribedShardsState
.get(seededStateIndex)
.getLastProcessedSequenceNum(),
registerShardMetricGroup(
consumerMetricGroup,
subscribedShardsState.get(seededStateIndex)),
shardDeserializationSchema));
}
}
// start periodic watermark emitter, if a watermark assigner was configured
if (periodicWatermarkAssigner != null) {
long periodicWatermarkIntervalMillis =
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
if (periodicWatermarkIntervalMillis > 0) {
ProcessingTimeService timerService =
((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
LOG.info(
"Starting periodic watermark emitter with interval {}",
periodicWatermarkIntervalMillis);
new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
if (watermarkTracker != null) {
// setup global watermark tracking
long watermarkSyncMillis =
Long.parseLong(
getConsumerConfiguration()
.getProperty(
ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
Long.toString(
ConsumerConfigConstants
.DEFAULT_WATERMARK_SYNC_MILLIS)));
watermarkTracker.setUpdateTimeoutMillis(
watermarkSyncMillis * 3); // synchronization latency
watermarkTracker.open(runtimeContext);
new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
// emit records ahead of watermark to offset synchronization latency
long lookaheadMillis =
Long.parseLong(
getConsumerConfiguration()
.getProperty(
ConsumerConfigConstants
.WATERMARK_LOOKAHEAD_MILLIS,
Long.toString(0)));
recordEmitter.setMaxLookaheadMillis(
Math.max(lookaheadMillis, watermarkSyncMillis * 3));
// record emitter depends on periodic watermark
// it runs in a separate thread since main thread is used for discovery
Runnable recordEmitterRunnable =
new Runnable() {
@Override
public void run() {
try {
recordEmitter.run();
} catch (Throwable error) {
// report the error that terminated the emitter loop to
// source thread
stopWithError(error);
}
}
};
Thread thread = new Thread(recordEmitterRunnable);
thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
thread.setDaemon(true);
thread.start();
}
}
this.shardIdleIntervalMillis =
Long.parseLong(
getConsumerConfiguration()
.getProperty(
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
}
// ------------------------------------------------------------------------
// finally, start the infinite shard discovery and consumer launching loop;
// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
// TODO: have this thread emit the records for tracking backpressure
final long discoveryIntervalMillis =
Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
Long.toString(
ConsumerConfigConstants
.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
if (this.numberOfActiveShards.get() == 0) {
LOG.info(
"Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Subtask {} is trying to discover new shards that were created due to resharding ...",
indexOfThisConsumerSubtask);
}
List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();
for (StreamShardHandle shard : newShardsDueToResharding) {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
KinesisStreamShardState newShardState =
new KinesisStreamShardState(
convertToStreamShardMetadata(shard),
shard,
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
int newStateIndex = registerNewSubscribedShardState(newShardState);
if (LOG.isInfoEnabled()) {
LOG.info(
"Subtask {} has discovered a new shard {} due to resharding, and will start consuming "
+ "the shard from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask,
newShardState.getStreamShardHandle().toString(),
newShardState.getLastProcessedSequenceNum(),
newStateIndex);
}
StreamShardHandle streamShardHandle = newShardState.getStreamShardHandle();
KinesisDeserializationSchema<T> shardDeserializationSchema =
getClonedDeserializationSchema();
shardDeserializationSchema.open(
RuntimeContextInitializationContextAdapters.deserializationAdapter(
runtimeContext,
// ignore the provided metric group
metricGroup ->
consumerMetricGroup
.addGroup(
"subtaskId",
String.valueOf(indexOfThisConsumerSubtask))
.addGroup(
"shardId",
streamShardHandle.getShard().getShardId())
.addGroup("user")));
shardConsumersExecutor.submit(
createShardConsumer(
newStateIndex,
newShardState.getStreamShardHandle(),
newShardState.getLastProcessedSequenceNum(),
registerShardMetricGroup(consumerMetricGroup, newShardState),
shardDeserializationSchema));
}
// we also check if we are running here so that we won't start the discovery sleep
// interval if the running flag was set to false during the middle of the while loop
if (running && discoveryIntervalMillis != 0) {
try {
cancelFuture.get(discoveryIntervalMillis, TimeUnit.MILLISECONDS);
LOG.debug("Cancelled discovery");
} catch (TimeoutException iex) {
// timeout is expected when fetcher is not cancelled
}
}
}
// make sure all resources have been terminated before leaving
try {
awaitTermination();
} catch (InterruptedException ie) {
// If there is an original exception, preserve it, since that's more important/useful.
this.error.compareAndSet(null, ie);
}
// any error thrown in the shard consumer threads will be thrown to the main thread
Throwable throwable = this.error.get();
if (throwable != null) {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new Exception(throwable);
}
}
}