in asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java [78:274]
public IScalarEvaluator createScalarEvaluator(final IEvaluatorContext ctx) throws HyracksDataException {
return new IScalarEvaluator() {
private final IBinaryHashFunction fieldNameHashFunction =
BinaryHashFunctionFactoryProvider.UTF8STRING_POINTABLE_INSTANCE.createBinaryHashFunction();
private final IBinaryComparator fieldNameComparator =
BinaryComparatorFactoryProvider.UTF8STRING_POINTABLE_INSTANCE.createBinaryComparator();
private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
private final DataOutput out = resultStorage.getDataOutput();
private final ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();
private final IPointable inputArg0 = new VoidPointable();
private final IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
private final IPointable[] fieldPointables = new VoidPointable[fieldPath.size()];
private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[fieldPath.size()];
@SuppressWarnings("unchecked")
private final ISerializerDeserializer<ANull> nullSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@SuppressWarnings("unchecked")
private final ISerializerDeserializer<AMissing> missingSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
{
generateFieldsPointables();
for (int index = 0; index < fieldPath.size(); ++index) {
recTypeInfos[index] = new RuntimeRecordTypeInfo();
}
}
@SuppressWarnings("unchecked")
private void generateFieldsPointables() throws HyracksDataException {
for (int i = 0; i < fieldPath.size(); i++) {
ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
DataOutput out = storage.getDataOutput();
AString as = new AString(fieldPath.get(i));
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as, out);
fieldPointables[i] = new VoidPointable();
fieldPointables[i].set(storage);
}
}
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
try {
resultStorage.reset();
eval0.evaluate(tuple, inputArg0);
if (PointableHelper.checkAndSetMissingOrNull(result, inputArg0)) {
return;
}
byte[] serRecord = inputArg0.getByteArray();
int start = inputArg0.getStartOffset();
int len = inputArg0.getLength();
if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
int subFieldIndex = -1;
int subFieldOffset = -1;
int subFieldLength = -1;
int nullBitmapSize = -1;
IAType subType = recordType;
recTypeInfos[0].reset(recordType);
ATypeTag subTypeTag = ATypeTag.MISSING;
int pathIndex = 0;
// Moving through closed fields first.
for (; pathIndex < fieldPointables.length; pathIndex++) {
if (subType.getTypeTag().equals(ATypeTag.UNION)) {
//enforced SubType
subType = ((AUnionType) subType).getActualType();
byte serializedTypeTag = subType.getTypeTag().serialize();
if (serializedTypeTag != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serializedTypeTag, 0,
ATypeTag.OBJECT);
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
recTypeInfos[pathIndex].reset((ARecordType) subType);
}
subFieldIndex = recTypeInfos[pathIndex].getFieldIndex(fieldPointables[pathIndex].getByteArray(),
fieldPointables[pathIndex].getStartOffset() + 1,
fieldPointables[pathIndex].getLength() - 1);
if (subFieldIndex == -1) {
break;
}
nullBitmapSize = RecordUtil.computeNullBitmapSize((ARecordType) subType);
subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, start,
subFieldIndex, nullBitmapSize, ((ARecordType) subType).isOpen());
if (subFieldOffset == 0) {
// the field is null, we checked the null bit map
// any path after null will return null.
nullSerde.serialize(ANull.NULL, out);
result.set(resultStorage);
return;
}
if (subFieldOffset < 0) {
// the field is missing, we checked the missing bit map
// any path after missing will return null.
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
subType = ((ARecordType) subType).getFieldTypes()[subFieldIndex];
if (subType.getTypeTag() == ATypeTag.OBJECT && pathIndex + 1 < fieldPointables.length) {
// Move to the next Depth
recTypeInfos[pathIndex + 1].reset((ARecordType) subType);
}
if (subType.getTypeTag().equals(ATypeTag.UNION)) {
subTypeTag = ((AUnionType) subType).getActualType().getTypeTag();
subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
subTypeTag, false);
} else {
subTypeTag = subType.getTypeTag();
subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
subTypeTag, false);
}
if (pathIndex < fieldPointables.length - 1) {
//setup next iteration
subRecordTmpStream.reset();
subRecordTmpStream.write(subTypeTag.serialize());
subRecordTmpStream.write(serRecord, subFieldOffset, subFieldLength);
serRecord = subRecordTmpStream.getByteArray();
start = 0;
}
// type check
if (pathIndex < fieldPointables.length - 1
&& serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
}
// Moving through open fields after we hit the first open field.
boolean openField = false;
for (; pathIndex < fieldPointables.length; pathIndex++) {
openField = true;
subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, start, len,
fieldPointables[pathIndex].getByteArray(), fieldPointables[pathIndex].getStartOffset(),
fieldNameHashFunction, fieldNameComparator);
if (subFieldOffset < 0) {
out.writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
result.set(resultStorage);
return;
}
subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
subFieldLength =
NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag, true)
+ 1;
if (pathIndex >= fieldPointables.length - 1) {
continue;
}
//setup next iteration
start = subFieldOffset;
len = subFieldLength;
// type check
if (serRecord[start] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
ExceptionUtil.warnTypeMismatch(ctx, sourceLoc, funID, serRecord[start], 0, ATypeTag.OBJECT);
missingSerde.serialize(AMissing.MISSING, out);
result.set(resultStorage);
return;
}
}
// emit the final result.
if (openField) {
result.set(serRecord, subFieldOffset, subFieldLength);
} else {
out.writeByte(subTypeTag.serialize());
out.write(serRecord, subFieldOffset, subFieldLength);
result.set(resultStorage);
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
};
}