private KTable doJoinOnForeignKey()

in streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java [1207:1433]


    private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
                                                          final ForeignKeyExtractor<? super K, ? super V, ? extends KO> foreignKeyExtractor,
                                                          final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                                          final TableJoined<K, KO> tableJoined,
                                                          final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                                          final boolean leftJoin) {
        Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
        Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(tableJoined, "tableJoined can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");

        //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
        //such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
        //not be done needlessly.
        ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);

        //Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.
        //This occurs whenever the extracted foreignKey changes values.
        enableSendingOldValues(true);

        final TableJoinedInternal<K, KO> tableJoinedInternal = new TableJoinedInternal<>(tableJoined);

        final NamedInternal renamed = new NamedInternal(tableJoinedInternal.name());

        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
            "-subscription-registration",
            builder,
            SUBSCRIPTION_REGISTRATION
        ) + TOPIC_SUFFIX;

        // the decoration can't be performed until we have the configuration available when the app runs,
        // so we pass Suppliers into the components, which they can call at run time

        final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");

        final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");

        final Supplier<String> valueHashSerdePseudoTopic =
            () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");

        builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName, InternalTopicProperties.empty());

        final Serde<KO> foreignKeySerde = ((KTableImpl<KO, ?, ?>) foreignKeyTable).keySerde;
        final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
        final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
            new SubscriptionResponseWrapperSerde<>(((KTableImpl<?, ?, VO>) foreignKeyTable).valueSerde);

        final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
            subscriptionForeignKeySerdePseudoTopic,
            foreignKeySerde,
            subscriptionPrimaryKeySerdePseudoTopic,
            keySerde
        );

        final ProcessorGraphNode<K, Change<V>> subscriptionSendNode = new ForeignJoinSubscriptionSendNode<>(
            new ProcessorParameters<>(
                new SubscriptionSendProcessorSupplier<>(
                    foreignKeyExtractor,
                    subscriptionForeignKeySerdePseudoTopic,
                    valueHashSerdePseudoTopic,
                    foreignKeySerde,
                    valueSerde == null ? null : valueSerde.serializer(),
                    leftJoin
                ),
                renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
            )
        );
        builder.addGraphNode(graphNode, subscriptionSendNode);

        final StreamPartitioner<KO, SubscriptionWrapper<K>> subscriptionSinkPartitioner =
                tableJoinedInternal.otherPartitioner() == null
                        ? null
                        : (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.otherPartitioner().partitions(topic, key, null, numPartitions));

        final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
            renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
            new StaticTopicNameExtractor<>(subscriptionTopicName),
            new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner))
        );
        builder.addGraphNode(subscriptionSendNode, subscriptionSink);

        final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
            renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
            Collections.singleton(subscriptionTopicName),
            new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
        );
        builder.addGraphNode(subscriptionSink, subscriptionSource);

        // The subscription source is the source node on the *receiving* end *after* the repartition.
        // This topic needs to be copartitioned with the Foreign Key table.
        final Set<String> copartitionedRepartitionSources =
            new HashSet<>(((KTableImpl<?, ?, ?>) foreignKeyTable).subTopologySourceNodes);
        copartitionedRepartitionSources.add(subscriptionSource.nodeName());
        builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);

        final String subscriptionStoreName = renamed
            .suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME);
        final StoreFactory subscriptionStoreFactory =
            new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde);

        final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
            "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
        final ProcessorGraphNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
            new ProcessorGraphNode<>(
                subscriptionReceiveName,
                new ProcessorParameters<>(
                    new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
                    subscriptionReceiveName)
            );
        builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);

        final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, ?, VO>) foreignKeyTable).valueGetterSupplier();
        final ProcessorToStateConnectorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
            new ProcessorToStateConnectorNode<>(
                new ProcessorParameters<>(
                    new SubscriptionJoinProcessorSupplier<>(
                        foreignKeyValueGetter
                    ),
                    renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
                ),
                Collections.singleton(foreignKeyValueGetter)
            );
        builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);

        final String foreignTableJoinName = renamed
            .suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR);
        final ProcessorGraphNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
            new ProcessorParameters<>(
                new ForeignTableJoinProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
                foreignTableJoinName
            )
        );
        builder.addGraphNode(((KTableImpl<?, ?, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);


        final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
        builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());

        final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> defaultForeignResponseSinkPartitioner =
                (topic, key, subscriptionResponseWrapper, numPartitions) -> {
                    final Integer partition = subscriptionResponseWrapper.primaryPartition();
                    return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
                };

        final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
                tableJoinedInternal.partitioner() == null
                        ? defaultForeignResponseSinkPartitioner
                        : (topic, key, val, numPartitions) -> getPartition.apply(tableJoinedInternal.partitioner().partitions(topic, key, null, numPartitions));

        final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
            new StreamSinkNode<>(
                renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
                new StaticTopicNameExtractor<>(finalRepartitionTopicName),
                new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner))
            );
        builder.addGraphNode(subscriptionJoinNode, foreignResponseSink);
        builder.addGraphNode(foreignTableJoinNode, foreignResponseSink);

        final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
            renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
            Collections.singleton(finalRepartitionTopicName),
            new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
        );
        builder.addGraphNode(foreignResponseSink, foreignResponseSource);

        // the response topic has to be copartitioned with the left (primary) side of the join
        final Set<String> resultSourceNodes = new HashSet<>(this.subTopologySourceNodes);
        resultSourceNodes.add(foreignResponseSource.nodeName());
        builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);

        final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
        final ProcessorToStateConnectorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new ProcessorToStateConnectorNode<>(
            new ProcessorParameters<>(
                new ResponseJoinProcessorSupplier<>(
                    primaryKeyValueGetter,
                    valueSerde == null ? null : valueSerde.serializer(),
                    valueHashSerdePseudoTopic,
                    joiner,
                    leftJoin
                ),
                renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
            ),
            Collections.singleton(primaryKeyValueGetter)
        );
        builder.addGraphNode(foreignResponseSource, responseJoinNode);

        final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);

        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
            new MaterializedInternal<>(
                materialized,
                builder,
                FK_JOIN_OUTPUT_NAME
            );

        // If we have a key serde, it's still valid, but we don't know the value serde, since it's the result
        // of the joiner (VR).
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(keySerde);
        }

        final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);

        final ProcessorGraphNode<K, VR> resultNode = new ProcessorGraphNode<>(
            resultProcessorName,
            new ProcessorParameters<>(
                resultProcessorSupplier,
                resultProcessorName
            )
        );
        resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
        builder.addGraphNode(responseJoinNode, resultNode);

        return new KTableImpl<K, V, VR>(
            resultProcessorName,
            keySerde,
            materializedInternal.valueSerde(),
            resultSourceNodes,
            materializedInternal.storeName(),
            resultProcessorSupplier,
            resultNode,
            builder
        );
    }