public void nextFrame()

in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/operators/NotifyBrokerRuntime.java [172:232]


    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
        tAccess.reset(buffer);
        int nTuple = tAccess.getTupleCount();
        for (int t = 0; t < nTuple; t++) {
            tRef.reset(tAccess, t);

            eval0.evaluate(tRef, inputArg0);
            eval1.evaluate(tRef, inputArg1);
            eval2.evaluate(tRef, inputArg2);
            eval3.evaluate(tRef, inputArg3);

            /*The incoming tuples have three fields:
             1. eval0 will get the serialized broker endpoint string
             2. eval1 will get the payload (either the subscriptionIds or entire results)
             3. eval2 will get the channel execution time stamp (the same for all tuples)
            */
            if (executionTimeMili == -1) {
                int resultSetOffset = inputArg2.getStartOffset();
                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
                ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
                executionTimeMili = executionTime.getChrononTime();
            }

            // Get HTTP endpoint
            int serBrokerOffset = inputArg0.getStartOffset();
            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
            String endpoint = stringSerDes.deserialize(di).getStringValue();

            // Get broker type
            int serTypeOffset = inputArg3.getStartOffset();
            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serTypeOffset + 1);
            String brokerType = stringSerDes.deserialize(di).getStringValue();
            IPrinter currPrinter =
                    brokerType.equals(BADConstants.BAD_BROKER_TYPE_NAME) ? admRecordPrinter : jsonRecordPrinter;

            PrintStream currPrintStream = sendStreams.getOrDefault(endpoint, null);
            if (currPrintStream == null) {
                try {
                    ByteArrayOutputStream newOutput = new ByteArrayOutputStream();
                    sendbaos.putIfAbsent(endpoint, newOutput);
                    sendStreams.put(endpoint, new PrintStream(newOutput, true, StandardCharsets.UTF_8.name()));
                } catch (UnsupportedEncodingException e) {
                    throw new HyracksDataException(e.getMessage());
                }
                currPrintStream = sendStreams.get(endpoint);
            } else {
                currPrintStream.append(',');
            }

            if (push) {
                int pushOffset = inputArg1.getStartOffset();
                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
                currPrinter.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
                        currPrintStream);
            } else {
                subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
                        inputArg1.getLength(), currPrintStream);
            }
        }

    }