in streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java [1207:1433]
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
final ForeignKeyExtractor<? super K, ? super V, ? extends KO> foreignKeyExtractor,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final boolean leftJoin) {
Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(tableJoined, "tableJoined can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
//Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
//such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
//not be done needlessly.
((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues(true);
final TableJoinedInternal<K, KO> tableJoinedInternal = new TableJoinedInternal<>(tableJoined);
final NamedInternal renamed = new NamedInternal(tableJoinedInternal.name());
final String subscriptionTopicName = renamed.suffixWithOrElseGet(
"-subscription-registration",
builder,
SUBSCRIPTION_REGISTRATION
) + TOPIC_SUFFIX;
// the decoration can't be performed until we have the configuration available when the app runs,
// so we pass Suppliers into the components, which they can call at run time
final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
final Supplier<String> valueHashSerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName, InternalTopicProperties.empty());
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, ?, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<?, ?, VO>) foreignKeyTable).valueSerde);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
subscriptionForeignKeySerdePseudoTopic,
foreignKeySerde,
subscriptionPrimaryKeySerdePseudoTopic,
keySerde
);
final ProcessorGraphNode<K, Change<V>> subscriptionSendNode = new ForeignJoinSubscriptionSendNode<>(
new ProcessorParameters<>(
new SubscriptionSendProcessorSupplier<>(
foreignKeyExtractor,
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
foreignKeySerde,
valueSerde == null ? null : valueSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
)
);
builder.addGraphNode(graphNode, subscriptionSendNode);
final StreamPartitioner<KO, SubscriptionWrapper<K>> subscriptionSinkPartitioner =
tableJoinedInternal.otherPartitioner() == null
? null
: (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.otherPartitioner().partitions(topic, key, null, numPartitions));
final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(subscriptionTopicName),
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner))
);
builder.addGraphNode(subscriptionSendNode, subscriptionSink);
final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
Collections.singleton(subscriptionTopicName),
new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
);
builder.addGraphNode(subscriptionSink, subscriptionSource);
// The subscription source is the source node on the *receiving* end *after* the repartition.
// This topic needs to be copartitioned with the Foreign Key table.
final Set<String> copartitionedRepartitionSources =
new HashSet<>(((KTableImpl<?, ?, ?>) foreignKeyTable).subTopologySourceNodes);
copartitionedRepartitionSources.add(subscriptionSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
final String subscriptionStoreName = renamed
.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME);
final StoreFactory subscriptionStoreFactory =
new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde);
final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
final ProcessorGraphNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new ProcessorGraphNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
subscriptionReceiveName)
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, ?, VO>) foreignKeyTable).valueGetterSupplier();
final ProcessorToStateConnectorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
new ProcessorToStateConnectorNode<>(
new ProcessorParameters<>(
new SubscriptionJoinProcessorSupplier<>(
foreignKeyValueGetter
),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(foreignKeyValueGetter)
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
final String foreignTableJoinName = renamed
.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR);
final ProcessorGraphNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
new ProcessorParameters<>(
new ForeignTableJoinProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
foreignTableJoinName
)
);
builder.addGraphNode(((KTableImpl<?, ?, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);
final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> defaultForeignResponseSinkPartitioner =
(topic, key, subscriptionResponseWrapper, numPartitions) -> {
final Integer partition = subscriptionResponseWrapper.primaryPartition();
return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
};
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
tableJoinedInternal.partitioner() == null
? defaultForeignResponseSinkPartitioner
: (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions));
final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(finalRepartitionTopicName),
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner))
);
builder.addGraphNode(subscriptionJoinNode, foreignResponseSink);
builder.addGraphNode(foreignTableJoinNode, foreignResponseSink);
final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
Collections.singleton(finalRepartitionTopicName),
new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
);
builder.addGraphNode(foreignResponseSink, foreignResponseSource);
// the response topic has to be copartitioned with the left (primary) side of the join
final Set<String> resultSourceNodes = new HashSet<>(this.subTopologySourceNodes);
resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final ProcessorToStateConnectorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new ProcessorToStateConnectorNode<>(
new ProcessorParameters<>(
new ResponseJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde == null ? null : valueSerde.serializer(),
valueHashSerdePseudoTopic,
joiner,
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(foreignResponseSource, responseJoinNode);
final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
materialized,
builder,
FK_JOIN_OUTPUT_NAME
);
// If we have a key serde, it's still valid, but we don't know the value serde, since it's the result
// of the joiner (VR).
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);
final ProcessorGraphNode<K, VR> resultNode = new ProcessorGraphNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,
resultProcessorName
)
);
resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
builder.addGraphNode(responseJoinNode, resultNode);
return new KTableImpl<K, V, VR>(
resultProcessorName,
keySerde,
materializedInternal.valueSerde(),
resultSourceNodes,
materializedInternal.storeName(),
resultProcessorSupplier,
resultNode,
builder
);
}