public IScalarEvaluator createScalarEvaluator()

in asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java [78:274]


    public IScalarEvaluator createScalarEvaluator(final IEvaluatorContext ctx) throws HyracksDataException {
        return new IScalarEvaluator() {

            private final IBinaryHashFunction fieldNameHashFunction =
                    BinaryHashFunctionFactoryProvider.UTF8STRING_POINTABLE_INSTANCE.createBinaryHashFunction();
            private final IBinaryComparator fieldNameComparator =
                    BinaryComparatorFactoryProvider.UTF8STRING_POINTABLE_INSTANCE.createBinaryComparator();
            private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
            private final DataOutput out = resultStorage.getDataOutput();
            private final ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();

            private final IPointable inputArg0 = new VoidPointable();
            private final IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
            private final IPointable[] fieldPointables = new VoidPointable[fieldPath.size()];
            private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[fieldPath.size()];
            @SuppressWarnings("unchecked")
            private final ISerializerDeserializer<ANull> nullSerde =
                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
            @SuppressWarnings("unchecked")
            private final ISerializerDeserializer<AMissing> missingSerde =
                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);

            {
                generateFieldsPointables();
                for (int index = 0; index < fieldPath.size(); ++index) {
                    recTypeInfos[index] = new RuntimeRecordTypeInfo();
                }

            }

            @SuppressWarnings("unchecked")
            private void generateFieldsPointables() throws HyracksDataException {
                for (int i = 0; i < fieldPath.size(); i++) {
                    ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
                    DataOutput out = storage.getDataOutput();
                    AString as = new AString(fieldPath.get(i));
                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as, out);
                    fieldPointables[i] = new VoidPointable();
                    fieldPointables[i].set(storage);
                }
            }

            @Override
            public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
                try {
                    resultStorage.reset();
                    eval0.evaluate(tuple, inputArg0);

                    if (PointableHelper.checkAndSetMissingOrNull(result, inputArg0)) {
                        return;
                    }

                    byte[] serRecord = inputArg0.getByteArray();
                    int start = inputArg0.getStartOffset();
                    int len = inputArg0.getLength();

                    if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
                        ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
                        missingSerde.serialize(AMissing.MISSING, out);
                        result.set(resultStorage);
                        return;
                    }

                    int subFieldIndex = -1;
                    int subFieldOffset = -1;
                    int subFieldLength = -1;
                    int nullBitmapSize = -1;

                    IAType subType = recordType;
                    recTypeInfos[0].reset(recordType);

                    ATypeTag subTypeTag = ATypeTag.MISSING;
                    int pathIndex = 0;

                    // Moving through closed fields first.
                    for (; pathIndex < fieldPointables.length; pathIndex++) {
                        if (subType.getTypeTag().equals(ATypeTag.UNION)) {
                            //enforced SubType
                            subType = ((AUnionType) subType).getActualType();
                            byte serializedTypeTag = subType.getTypeTag().serialize();
                            if (serializedTypeTag != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
                                ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serializedTypeTag, 0,
                                        ATypeTag.OBJECT);
                                missingSerde.serialize(AMissing.MISSING, out);
                                result.set(resultStorage);
                                return;
                            }
                            recTypeInfos[pathIndex].reset((ARecordType) subType);
                        }
                        subFieldIndex = recTypeInfos[pathIndex].getFieldIndex(fieldPointables[pathIndex].getByteArray(),
                                fieldPointables[pathIndex].getStartOffset() + 1,
                                fieldPointables[pathIndex].getLength() - 1);
                        if (subFieldIndex == -1) {
                            break;
                        }
                        nullBitmapSize = RecordUtil.computeNullBitmapSize((ARecordType) subType);
                        subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, start,
                                subFieldIndex, nullBitmapSize, ((ARecordType) subType).isOpen());
                        if (subFieldOffset == 0) {
                            // the field is null, we checked the null bit map
                            // any path after null will return null.
                            nullSerde.serialize(ANull.NULL, out);
                            result.set(resultStorage);
                            return;
                        }
                        if (subFieldOffset < 0) {
                            // the field is missing, we checked the missing bit map
                            // any path after missing will return null.
                            missingSerde.serialize(AMissing.MISSING, out);
                            result.set(resultStorage);
                            return;
                        }
                        subType = ((ARecordType) subType).getFieldTypes()[subFieldIndex];
                        if (subType.getTypeTag() == ATypeTag.OBJECT && pathIndex + 1 < fieldPointables.length) {
                            // Move to the next Depth
                            recTypeInfos[pathIndex + 1].reset((ARecordType) subType);
                        }
                        if (subType.getTypeTag().equals(ATypeTag.UNION)) {
                            subTypeTag = ((AUnionType) subType).getActualType().getTypeTag();
                            subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
                                    subTypeTag, false);
                        } else {
                            subTypeTag = subType.getTypeTag();
                            subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
                                    subTypeTag, false);
                        }

                        if (pathIndex < fieldPointables.length - 1) {
                            //setup next iteration
                            subRecordTmpStream.reset();
                            subRecordTmpStream.write(subTypeTag.serialize());
                            subRecordTmpStream.write(serRecord, subFieldOffset, subFieldLength);
                            serRecord = subRecordTmpStream.getByteArray();
                            start = 0;
                        }
                        // type check
                        if (pathIndex < fieldPointables.length - 1
                                && serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
                            ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
                            missingSerde.serialize(AMissing.MISSING, out);
                            result.set(resultStorage);
                            return;
                        }
                    }

                    // Moving through open fields after we hit the first open field.
                    boolean openField = false;
                    for (; pathIndex < fieldPointables.length; pathIndex++) {
                        openField = true;
                        subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, start, len,
                                fieldPointables[pathIndex].getByteArray(), fieldPointables[pathIndex].getStartOffset(),
                                fieldNameHashFunction, fieldNameComparator);
                        if (subFieldOffset < 0) {
                            out.writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
                            result.set(resultStorage);
                            return;
                        }

                        subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
                        subFieldLength =
                                NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag, true)
                                        + 1;

                        if (pathIndex >= fieldPointables.length - 1) {
                            continue;
                        }
                        //setup next iteration
                        start = subFieldOffset;
                        len = subFieldLength;

                        // type check
                        if (serRecord[start] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
                            missingSerde.serialize(AMissing.MISSING, out);
                            result.set(resultStorage);
                            return;
                        }
                        if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
                            ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
                            missingSerde.serialize(AMissing.MISSING, out);
                            result.set(resultStorage);
                            return;
                        }
                    }
                    // emit the final result.
                    if (openField) {
                        result.set(serRecord, subFieldOffset, subFieldLength);
                    } else {
                        out.writeByte(subTypeTag.serialize());
                        out.write(serRecord, subFieldOffset, subFieldLength);
                        result.set(resultStorage);
                    }
                } catch (IOException e) {
                    throw HyracksDataException.create(e);
                }
            }
        };
    }