public IOperatorNodePushable createPushRuntime()

in hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java [58:170]


    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
            throws HyracksDataException {
        final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
        Map<ActivityId, IActivity> activities = getActivityMap();
        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
            /**
             * extract start activities
             */
            List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
            if (conns == null || conns.size() == 0) {
                startActivities.put(entry.getKey(), entry.getValue());
            }
        }

        /**
         * wrap a RecordDescriptorProvider for the super activity
         */
        IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {

            @Override
            public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
                if (startActivities.get(aid) != null) {
                    /**
                     * if the activity is a start (input boundary) activity
                     */
                    int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
                    if (superActivityInputChannel >= 0) {
                        return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
                    }
                }
                if (SuperActivity.this.getActivityMap().get(aid) != null) {
                    /**
                     * if the activity is an internal activity of the super activity
                     */
                    IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                }

                /**
                 * the following is for the case where the activity is in other SuperActivities
                 */
                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
                    ActivityCluster ac = entry.getValue();
                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
                        SuperActivity sa = (SuperActivity) saEntry.getValue();
                        if (sa.getActivityMap().get(aid) != null) {
                            List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
                            if (conns != null && conns.size() >= inputIndex) {
                                IConnectorDescriptor conn = conns.get(inputIndex);
                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                            } else {
                                int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
                                if (superActivityInputChannel >= 0) {
                                    return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
                                            superActivityInputChannel);
                                }
                            }
                        }
                    }
                }
                return null;
            }

            @Override
            public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
                /**
                 * if the activity is an output-boundary activity
                 */
                int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex));
                if (superActivityOutputChannel >= 0) {
                    return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel);
                }

                if (SuperActivity.this.getActivityMap().get(aid) != null) {
                    /**
                     * if the activity is an internal activity of the super activity
                     */
                    IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                }

                /**
                 * the following is for the case where the activity is in other SuperActivities
                 */
                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
                    ActivityCluster ac = entry.getValue();
                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
                        SuperActivity sa = (SuperActivity) saEntry.getValue();
                        if (sa.getActivityMap().get(aid) != null) {
                            List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid);
                            if (conns != null && conns.size() >= outputIndex) {
                                IConnectorDescriptor conn = conns.get(outputIndex);
                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                            } else {
                                superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex));
                                if (superActivityOutputChannel >= 0) {
                                    return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(),
                                            superActivityOutputChannel);
                                }
                            }
                        }
                    }
                }
                return null;
            }

        };
        return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition,
                nPartitions);
    }