in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [1067:1182]
private void generateVariableLengthWriter(Versions fieldFlexibleVersions,
String name,
FieldType type,
Versions possibleVersions,
Versions nullableVersions,
boolean zeroCopy) {
IsNullConditional.forName(name).
possibleVersions(possibleVersions).
nullableVersions(nullableVersions).
alwaysEmitBlockScope(type.isString()).
ifNull(() -> {
VersionConditional.forVersions(nullableVersions, possibleVersions).
ifMember(presentVersions -> {
VersionConditional.forVersions(fieldFlexibleVersions, presentVersions).
ifMember(___ -> {
buffer.printf("_writable.writeUnsignedVarint(0);%n");
}).
ifNotMember(___ -> {
if (type.isString()) {
buffer.printf("_writable.writeShort((short) -1);%n");
} else {
buffer.printf("_writable.writeInt(-1);%n");
}
}).
generate(buffer);
}).
ifNotMember(__ -> {
buffer.printf("throw new NullPointerException();%n");
}).
generate(buffer);
}).
ifShouldNotBeNull(() -> {
final String lengthExpression;
if (type.isString()) {
buffer.printf("byte[] _stringBytes = _cache.getSerializedValue(%s);%n",
name);
lengthExpression = "_stringBytes.length";
} else if (type.isBytes()) {
if (zeroCopy) {
lengthExpression = String.format("%s.remaining()", name);
} else {
lengthExpression = String.format("%s.length", name);
}
} else if (type.isRecords()) {
lengthExpression = String.format("%s.sizeInBytes()", name);
} else if (type.isArray()) {
lengthExpression = String.format("%s.size()", name);
} else {
throw new RuntimeException("Unhandled type " + type);
}
// Check whether we're dealing with a flexible version or not. In a flexible
// version, the length is serialized differently.
//
// Note: for arrays, each branch of the if contains the loop for writing out
// the elements. This allows us to lift the version check out of the loop.
// This is helpful for things like arrays of strings, where each element
// will be serialized differently based on whether the version is flexible.
VersionConditional.forVersions(fieldFlexibleVersions, possibleVersions).
ifMember(ifMemberVersions -> {
buffer.printf("_writable.writeUnsignedVarint(%s + 1);%n", lengthExpression);
}).
ifNotMember(ifNotMemberVersions -> {
if (type.isString()) {
buffer.printf("_writable.writeShort((short) %s);%n", lengthExpression);
} else {
buffer.printf("_writable.writeInt(%s);%n", lengthExpression);
}
}).
generate(buffer);
if (type.isString()) {
buffer.printf("_writable.writeByteArray(_stringBytes);%n");
} else if (type.isBytes()) {
if (zeroCopy) {
buffer.printf("_writable.writeByteBuffer(%s);%n", name);
} else {
buffer.printf("_writable.writeByteArray(%s);%n", name);
}
} else if (type.isRecords()) {
headerGenerator.addImport(MessageGenerator.RECORDS_WRITABLE_CLASS);
buffer.printf("if (_writable instanceof RecordsWritable) {%n");
buffer.incrementIndent();
buffer.printf("((RecordsWritable) _writable).writeRecords(%s);%n", name);
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"Cannot write records to writer of class: \" + _writable.getClass().getSimpleName());%n");
buffer.decrementIndent();
buffer.printf("}%n");
} else if (type.isArray()) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
FieldType elementType = arrayType.elementType();
String elementName = String.format("%sElement", name);
buffer.printf("for (%s %s : %s) {%n",
elementType.getBoxedJavaType(headerGenerator),
elementName,
name);
buffer.incrementIndent();
if (elementType.isArray()) {
throw new RuntimeException("Nested arrays are not supported. " +
"Use an array of structures containing another array.");
} else if (elementType.isBytes() || elementType.isString()) {
generateVariableLengthWriter(fieldFlexibleVersions,
elementName,
elementType,
possibleVersions,
Versions.NONE,
false);
} else {
buffer.printf("%s;%n", primitiveWriteExpression(elementType, elementName));
}
buffer.decrementIndent();
buffer.printf("}%n");
}
}).
generate(buffer);
}