private void generateVariableLengthReader()

in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [572:684]


    private void generateVariableLengthReader(Versions fieldFlexibleVersions,
                                              String name,
                                              FieldType type,
                                              Versions possibleVersions,
                                              Versions nullableVersions,
                                              String assignmentPrefix,
                                              String assignmentSuffix,
                                              boolean isStructArrayWithKeys,
                                              boolean zeroCopy) {
        String lengthVar = type.isArray() ? "arrayLength" : "length";
        buffer.printf("int %s;%n", lengthVar);
        VersionConditional.forVersions(fieldFlexibleVersions, possibleVersions).
            ifMember(__ -> {
                buffer.printf("%s = _readable.readUnsignedVarint() - 1;%n", lengthVar);
            }).
            ifNotMember(__ -> {
                if (type.isString()) {
                    buffer.printf("%s = _readable.readShort();%n", lengthVar);
                } else if (type.isBytes() || type.isArray() || type.isRecords()) {
                    buffer.printf("%s = _readable.readInt();%n", lengthVar);
                } else {
                    throw new RuntimeException("Can't handle variable length type " + type);
                }
            }).
            generate(buffer);
        buffer.printf("if (%s < 0) {%n", lengthVar);
        buffer.incrementIndent();
        VersionConditional.forVersions(nullableVersions, possibleVersions).
            ifNotMember(__ -> {
                buffer.printf("throw new RuntimeException(\"non-nullable field %s " +
                    "was serialized as null\");%n", name);
            }).
            ifMember(__ -> {
                buffer.printf("%snull%s", assignmentPrefix, assignmentSuffix);
            }).
            generate(buffer);
        buffer.decrementIndent();
        if (type.isString()) {
            buffer.printf("} else if (%s > 0x7fff) {%n", lengthVar);
            buffer.incrementIndent();
            buffer.printf("throw new RuntimeException(\"string field %s " +
                "had invalid length \" + %s);%n", name, lengthVar);
            buffer.decrementIndent();
        }
        buffer.printf("} else {%n");
        buffer.incrementIndent();
        if (type.isString()) {
            buffer.printf("%s_readable.readString(%s)%s",
                assignmentPrefix, lengthVar, assignmentSuffix);
        } else if (type.isBytes()) {
            if (zeroCopy) {
                buffer.printf("%s_readable.readByteBuffer(%s)%s",
                    assignmentPrefix, lengthVar, assignmentSuffix);
            } else {
                buffer.printf("byte[] newBytes = new byte[%s];%n", lengthVar);
                buffer.printf("_readable.readArray(newBytes);%n");
                buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
            }
        } else if (type.isRecords()) {
            headerGenerator.addImport(MessageGenerator.RECORDS_READABLE_CLASS);
            buffer.printf("if (_readable instanceof RecordsReadable) {%n");
            buffer.incrementIndent();
            buffer.printf("%s((RecordsReadable) _readable).readRecords(%s)%s",
                assignmentPrefix, lengthVar, assignmentSuffix);
            buffer.decrementIndent();
            buffer.printf("} else {%n");
            buffer.incrementIndent();
            buffer.printf("throw new RuntimeException(\"Cannot read records from " +
                "reader of class: \" + _readable.getClass().getSimpleName());%n");
            buffer.decrementIndent();
            buffer.printf("}%n");
        } else if (type.isArray()) {
            FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
            if (isStructArrayWithKeys) {
                headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                buffer.printf("%s newCollection = new %s(%s);%n",
                    FieldSpec.collectionType(arrayType.elementType().toString()),
                        FieldSpec.collectionType(arrayType.elementType().toString()), lengthVar);
            } else {
                headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
                String boxedArrayType =
                    arrayType.elementType().getBoxedJavaType(headerGenerator);
                buffer.printf("ArrayList<%s> newCollection = new ArrayList<%s>(%s);%n",
                    boxedArrayType, boxedArrayType, lengthVar);
            }
            buffer.printf("for (int i = 0; i < %s; i++) {%n", lengthVar);
            buffer.incrementIndent();
            if (arrayType.elementType().isArray()) {
                throw new RuntimeException("Nested arrays are not supported.  " +
                    "Use an array of structures containing another array.");
            } else if (arrayType.elementType().isBytes() || arrayType.elementType().isString()) {
                generateVariableLengthReader(fieldFlexibleVersions,
                    name + " element",
                    arrayType.elementType(),
                    possibleVersions,
                    Versions.NONE,
                    "newCollection.add(",
                    String.format(");%n"),
                    false,
                    false);
            } else {
                buffer.printf("newCollection.add(%s);%n",
                    primitiveReadExpression(arrayType.elementType()));
            }
            buffer.decrementIndent();
            buffer.printf("}%n");
            buffer.printf("%snewCollection%s", assignmentPrefix, assignmentSuffix);
        } else {
            throw new RuntimeException("Can't handle variable length type " + type);
        }
        buffer.decrementIndent();
        buffer.printf("}%n");
    }