private static JobSpecification combineIntakeCollectJobs()

in asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java [242:421]


    private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,
            JobSpecification intakeJob, List<JobSpecification> jobsList, List<FeedConnection> feedConnections,
            String[] intakeLocations) throws AlgebricksException, HyracksDataException {
        JobSpecification jobSpec = new JobSpecification(intakeJob.getFrameSize());

        // copy ingestor
        FeedIntakeOperatorDescriptor firstOp =
                (FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0));
        FeedIntakeOperatorDescriptor ingestionOp;
        if (firstOp.getAdaptorFactory() == null) {
            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryDatabase(),
                    firstOp.getAdaptorLibraryDataverse(), firstOp.getAdaptorLibraryName(),
                    firstOp.getAdaptorFactoryClassName(), firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
                    firstOp.getOutputRecordDescriptors()[0]);
        } else {
            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(),
                    firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
                    firstOp.getOutputRecordDescriptors()[0]);
        }
        // create replicator
        ReplicateOperatorDescriptor replicateOp =
                new ReplicateOperatorDescriptor(jobSpec, ingestionOp.getOutputRecordDescriptors()[0], jobsList.size());
        jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), ingestionOp, 0, replicateOp, 0);
        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, ingestionOp, intakeLocations);
        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, replicateOp, intakeLocations);
        // Loop over the jobs to copy operators and connections
        Map<OperatorDescriptorId, OperatorDescriptorId> operatorIdMapping = new HashMap<>();
        Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
        Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
        Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
        Map<Integer, TxnId> txnIdMap = new HashMap<>();
        FeedMetaOperatorDescriptor metaOp;

        for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
            FeedConnection curFeedConnection = feedConnections.get(iter1);
            JobSpecification subJob = jobsList.get(iter1);
            operatorIdMapping.clear();
            Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
            String datasetName = feedConnections.get(iter1).getDatasetName();
            FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), datasetName);

            FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil.validateIfPolicyExists(
                    curFeedConnection.getDatabaseName(), curFeedConnection.getDataverseName(),
                    curFeedConnection.getPolicyName(), metadataProvider.getMetadataTxnContext());

            for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) {
                IOperatorDescriptor opDesc = entry.getValue();
                OperatorDescriptorId oldId = opDesc.getOperatorId();
                OperatorDescriptorId opId = null;
                if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                        && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
                    metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
                    opId = metaOp.getOperatorId();
                    opDesc.setOperatorId(opId);
                } else {
                    if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                        AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc;
                        IPushRuntimeFactory[] runtimeFactories = algOp.getPipeline().getRuntimeFactories();
                        // Tweak AssignOp to work with messages
                        if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length > 1) {
                            IConnectorDescriptor connectorDesc =
                                    subJob.getOperatorInputMap().get(opDesc.getOperatorId()).get(0);
                            // anything on the network interface needs to be message compatible
                            if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
                                metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
                                opId = metaOp.getOperatorId();
                                opDesc.setOperatorId(opId);
                            }
                        }
                    }
                    if (opId == null) {
                        opId = jobSpec.createOperatorDescriptorId(opDesc);
                    }
                }
                operatorIdMapping.put(oldId, opId);
            }

            // copy connectors
            connectorIdMapping.clear();
            subJob.getConnectorMap().forEach((key, connDesc) -> {
                ConnectorDescriptorId newConnId;
                if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                    MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
                    connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
                            m2nConn.getTuplePartitionComputerFactory());
                    newConnId = connDesc.getConnectorId();
                } else {
                    newConnId = jobSpec.createConnectorDescriptor(connDesc);
                }
                connectorIdMapping.put(key, newConnId);
            });

            // make connections between operators
            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
                    .getConnectorOperatorMap().entrySet()) {
                ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
                IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
                Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
                Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
                IOperatorDescriptor leftOpDesc = jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId());
                IOperatorDescriptor rightOpDesc = jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId());
                if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) {
                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc,
                            leftOp.getRight());
                }
                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
            }

            // prepare for setting partition constraints
            operatorLocations.clear();
            operatorCounts.clear();

            for (Constraint constraint : subJob.getUserConstraints()) {
                LValueConstraintExpression lexpr = constraint.getLValue();
                ConstraintExpression cexpr = constraint.getRValue();
                OperatorDescriptorId opId;
                switch (lexpr.getTag()) {
                    case PARTITION_COUNT:
                        opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
                        operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue());
                        break;
                    case PARTITION_LOCATION:
                        opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
                        IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                        List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
                        if (locations == null) {
                            locations = new ArrayList<>();
                            operatorLocations.put(opDesc.getOperatorId(), locations);
                        }
                        String location = (String) ((ConstantExpression) cexpr).getValue();
                        LocationConstraint lc =
                                new LocationConstraint(location, ((PartitionLocationExpression) lexpr).getPartition());
                        locations.add(lc);
                        break;
                    default:
                        break;
                }
            }

            // set absolute location constraints
            for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
                IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey());
                // why do we need to sort?
                Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint o2) -> {
                    return o1.partition - o2.partition;
                });
                String[] locations = new String[entry.getValue().size()];
                for (int j = 0; j < locations.length; ++j) {
                    locations[j] = entry.getValue().get(j).location;
                }
                PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, opDesc, locations);
            }

            // set count constraints
            operatorCounts.forEach((key, value) -> {
                IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(key);
                if (!operatorLocations.keySet().contains(key)) {
                    PartitionConstraintHelper.addPartitionCountConstraint(jobSpec, opDesc, value);
                }
            });
            // roots
            for (OperatorDescriptorId root : subJob.getRoots()) {
                jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
            }
            int datasetId = metadataProvider.findDataset(curFeedConnection.getDatabaseName(),
                    curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName()).getDatasetId();
            TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
            txnIdMap.put(datasetId, txnId);
        }

        // jobEventListenerFactory
        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIdMap, true));
        // useConnectorSchedulingPolicy
        jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
        // connectorAssignmentPolicy
        jobSpec.setConnectorPolicyAssignmentPolicy(jobsList.get(0).getConnectorPolicyAssignmentPolicy());
        return jobSpec;
    }