in streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java [955:1178]
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named joinName,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final boolean leftJoin) {
Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joinName, "joinName can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
//Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
//such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
//not be done needlessly.
((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues();
//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues();
final NamedInternal renamed = new NamedInternal(joinName);
final String subscriptionTopicName = renamed.suffixWithOrElseGet(
"-subscription-registration",
builder,
SUBSCRIPTION_REGISTRATION
) + TOPIC_SUFFIX;
// the decoration can't be performed until we have the configuration available when the app runs,
// so we pass Suppliers into the components, which they can call at run time
final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
final Supplier<String> valueHashSerdePseudoTopic =
() -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName, InternalTopicProperties.empty());
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueSerde);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
subscriptionForeignKeySerdePseudoTopic,
foreignKeySerde,
subscriptionPrimaryKeySerdePseudoTopic,
keySerde
);
final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionSendProcessorSupplier<>(
foreignKeyExtractor,
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
foreignKeySerde,
valueSerde == null ? null : valueSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
)
);
builder.addGraphNode(streamsGraphNode, subscriptionNode);
final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(subscriptionTopicName),
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
);
builder.addGraphNode(subscriptionNode, subscriptionSink);
final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
Collections.singleton(subscriptionTopicName),
new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
);
builder.addGraphNode(subscriptionSink, subscriptionSource);
// The subscription source is the source node on the *receiving* end *after* the repartition.
// This topic needs to be copartitioned with the Foreign Key table.
final Set<String> copartitionedRepartitionSources =
new HashSet<>(((KTableImpl<?, ?, ?>) foreignKeyTable).subTopologySourceNodes);
copartitionedRepartitionSources.add(subscriptionSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> subscriptionStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(
renamed.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME)
),
new Serdes.BytesSerde(),
subscriptionWrapperSerde
);
builder.addStateStore(subscriptionStore);
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionStoreReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.emptySet()
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionJoinForeignProcessorSupplier<>(
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()
),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier())
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.emptySet()
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).streamsGraphNode, foreignJoinSubscriptionNode);
final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(finalRepartitionTopicName),
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
);
builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);
final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
Collections.singleton(finalRepartitionTopicName),
new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
);
builder.addGraphNode(foreignResponseSink, foreignResponseSource);
// the response topic has to be copartitioned with the left (primary) side of the join
final Set<String> resultSourceNodes = new HashSet<>(this.subTopologySourceNodes);
resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde == null ? null : valueSerde.serializer(),
valueHashSerdePseudoTopic,
joiner,
leftJoin
);
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> resolverNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
resolverProcessorSupplier,
renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(foreignResponseSource, resolverNode);
final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
materialized,
builder,
FK_JOIN_OUTPUT_NAME
);
// If we have a key serde, it's still valid, but we don't know the value serde, since it's the result
// of the joiner (VR).
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,
resultProcessorName
),
resultStore
);
builder.addGraphNode(resolverNode, resultNode);
return new KTableImpl<K, V, VR>(
resultProcessorName,
keySerde,
materializedInternal.valueSerde(),
resultSourceNodes,
materializedInternal.storeName(),
resultProcessorSupplier,
resultNode,
builder
);
}