private void generateVariableLengthWriter()

in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [1067:1182]


    private void generateVariableLengthWriter(Versions fieldFlexibleVersions,
                                              String name,
                                              FieldType type,
                                              Versions possibleVersions,
                                              Versions nullableVersions,
                                              boolean zeroCopy) {
        IsNullConditional.forName(name).
            possibleVersions(possibleVersions).
            nullableVersions(nullableVersions).
            alwaysEmitBlockScope(type.isString()).
            ifNull(() -> {
                VersionConditional.forVersions(nullableVersions, possibleVersions).
                    ifMember(presentVersions -> {
                        VersionConditional.forVersions(fieldFlexibleVersions, presentVersions).
                            ifMember(___ -> {
                                buffer.printf("_writable.writeUnsignedVarint(0);%n");
                            }).
                            ifNotMember(___ -> {
                                if (type.isString()) {
                                    buffer.printf("_writable.writeShort((short) -1);%n");
                                } else {
                                    buffer.printf("_writable.writeInt(-1);%n");
                                }
                            }).
                            generate(buffer);
                    }).
                    ifNotMember(__ -> {
                        buffer.printf("throw new NullPointerException();%n");
                    }).
                    generate(buffer);
            }).
            ifShouldNotBeNull(() -> {
                final String lengthExpression;
                if (type.isString()) {
                    buffer.printf("byte[] _stringBytes = _cache.getSerializedValue(%s);%n",
                        name);
                    lengthExpression = "_stringBytes.length";
                } else if (type.isBytes()) {
                    if (zeroCopy) {
                        lengthExpression = String.format("%s.remaining()", name);
                    } else {
                        lengthExpression = String.format("%s.length", name);
                    }
                } else if (type.isRecords()) {
                    lengthExpression = String.format("%s.sizeInBytes()", name);
                } else if (type.isArray()) {
                    lengthExpression = String.format("%s.size()", name);
                } else {
                    throw new RuntimeException("Unhandled type " + type);
                }
                // Check whether we're dealing with a flexible version or not.  In a flexible
                // version, the length is serialized differently.
                //
                // Note: for arrays, each branch of the if contains the loop for writing out
                // the elements.  This allows us to lift the version check out of the loop.
                // This is helpful for things like arrays of strings, where each element
                // will be serialized differently based on whether the version is flexible.
                VersionConditional.forVersions(fieldFlexibleVersions, possibleVersions).
                    ifMember(ifMemberVersions -> {
                        buffer.printf("_writable.writeUnsignedVarint(%s + 1);%n", lengthExpression);
                    }).
                    ifNotMember(ifNotMemberVersions -> {
                        if (type.isString()) {
                            buffer.printf("_writable.writeShort((short) %s);%n", lengthExpression);
                        } else {
                            buffer.printf("_writable.writeInt(%s);%n", lengthExpression);
                        }
                    }).
                    generate(buffer);
                if (type.isString()) {
                    buffer.printf("_writable.writeByteArray(_stringBytes);%n");
                } else if (type.isBytes()) {
                    if (zeroCopy) {
                        buffer.printf("_writable.writeByteBuffer(%s);%n", name);
                    } else {
                        buffer.printf("_writable.writeByteArray(%s);%n", name);
                    }
                } else if (type.isRecords()) {
                    headerGenerator.addImport(MessageGenerator.RECORDS_WRITABLE_CLASS);
                    buffer.printf("if (_writable instanceof RecordsWritable) {%n");
                    buffer.incrementIndent();
                    buffer.printf("((RecordsWritable) _writable).writeRecords(%s);%n", name);
                    buffer.decrementIndent();
                    buffer.printf("} else {%n");
                    buffer.incrementIndent();
                    buffer.printf("throw new RuntimeException(\"Cannot write records to writer of class: \" + _writable.getClass().getSimpleName());%n");
                    buffer.decrementIndent();
                    buffer.printf("}%n");
                } else if (type.isArray()) {
                    FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
                    FieldType elementType = arrayType.elementType();
                    String elementName = String.format("%sElement", name);
                    buffer.printf("for (%s %s : %s) {%n",
                        elementType.getBoxedJavaType(headerGenerator),
                        elementName,
                        name);
                    buffer.incrementIndent();
                    if (elementType.isArray()) {
                        throw new RuntimeException("Nested arrays are not supported.  " +
                            "Use an array of structures containing another array.");
                    } else if (elementType.isBytes() || elementType.isString()) {
                        generateVariableLengthWriter(fieldFlexibleVersions,
                            elementName,
                            elementType,
                            possibleVersions,
                            Versions.NONE,
                            false);
                    } else {
                        buffer.printf("%s;%n", primitiveWriteExpression(elementType, elementName));
                    }
                    buffer.decrementIndent();
                    buffer.printf("}%n");
                }
            }).
            generate(buffer);
    }