private KTable doJoinOnForeignKey()

in streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java [955:1178]


    private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
                                                          final Function<V, KO> foreignKeyExtractor,
                                                          final ValueJoiner<V, VO, VR> joiner,
                                                          final Named joinName,
                                                          final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                                          final boolean leftJoin) {
        Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
        Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joinName, "joinName can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");

        //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
        //such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
        //not be done needlessly.
        ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues();

        //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
        //This occurs whenever the extracted foreignKey changes values.
        enableSendingOldValues();

        final NamedInternal renamed = new NamedInternal(joinName);

        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
            "-subscription-registration",
            builder,
            SUBSCRIPTION_REGISTRATION
        ) + TOPIC_SUFFIX;

        // the decoration can't be performed until we have the configuration available when the app runs,
        // so we pass Suppliers into the components, which they can call at run time

        final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");

        final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");

        final Supplier<String> valueHashSerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");

        builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName, InternalTopicProperties.empty());

        final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
        final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
        final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
            new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueSerde);

        final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
            subscriptionForeignKeySerdePseudoTopic,
            foreignKeySerde,
            subscriptionPrimaryKeySerdePseudoTopic,
            keySerde
        );

        final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>(
            new ProcessorParameters<>(
                new ForeignJoinSubscriptionSendProcessorSupplier<>(
                    foreignKeyExtractor,
                    subscriptionForeignKeySerdePseudoTopic,
                    valueHashSerdePseudoTopic,
                    foreignKeySerde,
                    valueSerde == null ? null : valueSerde.serializer(),
                    leftJoin
                ),
                renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
            )
        );
        builder.addGraphNode(streamsGraphNode, subscriptionNode);


        final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
            renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
            new StaticTopicNameExtractor<>(subscriptionTopicName),
            new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
        );
        builder.addGraphNode(subscriptionNode, subscriptionSink);

        final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
            renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
            Collections.singleton(subscriptionTopicName),
            new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
        );
        builder.addGraphNode(subscriptionSink, subscriptionSource);

        // The subscription source is the source node on the *receiving* end *after* the repartition.
        // This topic needs to be copartitioned with the Foreign Key table.
        final Set<String> copartitionedRepartitionSources =
            new HashSet<>(((KTableImpl<?, ?, ?>) foreignKeyTable).subTopologySourceNodes);
        copartitionedRepartitionSources.add(subscriptionSource.nodeName());
        builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);


        final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> subscriptionStore =
            Stores.timestampedKeyValueStoreBuilder(
                Stores.persistentTimestampedKeyValueStore(
                    renamed.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME)
                ),
                new Serdes.BytesSerde(),
                subscriptionWrapperSerde
            );
        builder.addStateStore(subscriptionStore);

        final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
            new StatefulProcessorNode<>(
                new ProcessorParameters<>(
                    new SubscriptionStoreReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
                    renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR)
                ),
                Collections.singleton(subscriptionStore),
                Collections.emptySet()
            );
        builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);

        final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
            new StatefulProcessorNode<>(
                new ProcessorParameters<>(
                    new SubscriptionJoinForeignProcessorSupplier<>(
                        ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()
                    ),
                    renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
                ),
                Collections.emptySet(),
                Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier())
            );
        builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);

        final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
            new ProcessorParameters<>(
                new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema),
                renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
            ),
            Collections.singleton(subscriptionStore),
            Collections.emptySet()
        );
        builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).streamsGraphNode, foreignJoinSubscriptionNode);


        final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
        builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());

        final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
            new StreamSinkNode<>(
                renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
                new StaticTopicNameExtractor<>(finalRepartitionTopicName),
                new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
            );
        builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
        builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);

        final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
            renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
            Collections.singleton(finalRepartitionTopicName),
            new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
        );
        builder.addGraphNode(foreignResponseSink, foreignResponseSource);

        // the response topic has to be copartitioned with the left (primary) side of the join
        final Set<String> resultSourceNodes = new HashSet<>(this.subTopologySourceNodes);
        resultSourceNodes.add(foreignResponseSource.nodeName());
        builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);

        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
        final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
            primaryKeyValueGetter,
            valueSerde == null ? null : valueSerde.serializer(),
            valueHashSerdePseudoTopic,
            joiner,
            leftJoin
        );
        final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> resolverNode = new StatefulProcessorNode<>(
            new ProcessorParameters<>(
                resolverProcessorSupplier,
                renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
            ),
            Collections.emptySet(),
            Collections.singleton(primaryKeyValueGetter)
        );
        builder.addGraphNode(foreignResponseSource, resolverNode);

        final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);

        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
            new MaterializedInternal<>(
                materialized,
                builder,
                FK_JOIN_OUTPUT_NAME
            );

        // If we have a key serde, it's still valid, but we don't know the value serde, since it's the result
        // of the joiner (VR).
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(keySerde);
        }

        final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
            materializedInternal.storeName(),
            materializedInternal.queryableStoreName()
        );

        final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
            new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();

        final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
            resultProcessorName,
            new ProcessorParameters<>(
                resultProcessorSupplier,
                resultProcessorName
            ),
            resultStore
        );
        builder.addGraphNode(resolverNode, resultNode);

        return new KTableImpl<K, V, VR>(
            resultProcessorName,
            keySerde,
            materializedInternal.valueSerde(),
            resultSourceNodes,
            materializedInternal.storeName(),
            resultProcessorSupplier,
            resultNode,
            builder
        );
    }