in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [702:903]
private void generateClassWriter(String className, StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.WRITABLE_CLASS);
headerGenerator.addImport(MessageGenerator.OBJECT_SERIALIZATION_CACHE_CLASS);
buffer.printf("@Override%n");
buffer.printf("public void write(Writable _writable, ObjectSerializationCache _cache, short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(struct.versions(), parentVersions).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't write " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
buffer.printf("int _numTaggedFields = 0;%n");
Versions curVersions = parentVersions.intersect(struct.versions());
TreeMap<Integer, FieldSpec> taggedFields = new TreeMap<>();
for (FieldSpec field : struct.fields()) {
VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
ifMember(presentVersions -> {
VersionConditional.forVersions(field.taggedVersions(), presentVersions).
ifNotMember(presentAndUntaggedVersions -> {
if (field.type().isVariableLength() && !field.type().isStruct()) {
ClauseGenerator callGenerateVariableLengthWriter = versions -> {
generateVariableLengthWriter(fieldFlexibleVersions(field),
field.camelCaseName(),
field.type(),
versions,
field.nullableVersions(),
field.zeroCopy());
};
// For arrays where the field type needs to be serialized differently in flexible
// versions, lift the flexible version check outside of the array.
// This may mean generating two separate 'for' loops-- one for flexible
// versions, and one for regular versions.
if (field.type().isArray() &&
((FieldType.ArrayType) field.type()).elementType().
serializationIsDifferentInFlexibleVersions()) {
VersionConditional.forVersions(fieldFlexibleVersions(field),
presentAndUntaggedVersions).
ifMember(callGenerateVariableLengthWriter).
ifNotMember(callGenerateVariableLengthWriter).
generate(buffer);
} else {
callGenerateVariableLengthWriter.generate(presentAndUntaggedVersions);
}
} else if (field.type().isStruct()) {
IsNullConditional.forName(field.camelCaseName()).
possibleVersions(presentAndUntaggedVersions).
nullableVersions(field.nullableVersions()).
ifNull(() -> {
VersionConditional.forVersions(field.nullableVersions(), presentAndUntaggedVersions).
ifMember(__ -> {
buffer.printf("_writable.writeByte((byte) -1);%n");
}).
ifNotMember(__ -> {
buffer.printf("throw new NullPointerException();%n");
}).
generate(buffer);
}).
ifShouldNotBeNull(() -> {
VersionConditional.forVersions(field.nullableVersions(), presentAndUntaggedVersions).
ifMember(__ -> {
buffer.printf("_writable.writeByte((byte) 1);%n");
}).
generate(buffer);
buffer.printf("%s;%n",
primitiveWriteExpression(field.type(), field.camelCaseName()));
}).
generate(buffer);
} else {
buffer.printf("%s;%n",
primitiveWriteExpression(field.type(), field.camelCaseName()));
}
}).
ifMember(__ -> {
field.generateNonDefaultValueCheck(headerGenerator,
structRegistry, buffer, "this.", field.nullableVersions());
buffer.incrementIndent();
buffer.printf("_numTaggedFields++;%n");
buffer.decrementIndent();
buffer.printf("}%n");
if (taggedFields.put(field.tag().get(), field) != null) {
throw new RuntimeException("Field " + field.name() + " has tag " +
field.tag() + ", but another field already used that tag.");
}
}).
generate(buffer);
});
if (!field.ignorable()) {
cond.ifNotMember(__ -> {
field.generateNonIgnorableFieldCheck(headerGenerator,
structRegistry, "this.", buffer);
});
}
cond.generate(buffer);
}
headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS);
buffer.printf("RawTaggedFieldWriter _rawWriter = RawTaggedFieldWriter.forFields(_unknownTaggedFields);%n");
buffer.printf("_numTaggedFields += _rawWriter.numFields();%n");
VersionConditional.forVersions(messageFlexibleVersions, curVersions).
ifNotMember(__ -> {
generateCheckForUnsupportedNumTaggedFields("_numTaggedFields > 0");
}).
ifMember(flexibleVersions -> {
buffer.printf("_writable.writeUnsignedVarint(_numTaggedFields);%n");
int prevTag = -1;
for (FieldSpec field : taggedFields.values()) {
if (prevTag + 1 != field.tag().get()) {
buffer.printf("_rawWriter.writeRawTags(_writable, %d);%n", field.tag().get());
}
VersionConditional.
forVersions(field.taggedVersions().intersect(field.versions()), flexibleVersions).
allowMembershipCheckAlwaysFalse(false).
ifMember(presentAndTaggedVersions -> {
IsNullConditional cond = IsNullConditional.forName(field.camelCaseName()).
nullableVersions(field.nullableVersions()).
possibleVersions(presentAndTaggedVersions).
alwaysEmitBlockScope(true).
ifShouldNotBeNull(() -> {
if (!field.defaultString().equals("null")) {
field.generateNonDefaultValueCheck(headerGenerator,
structRegistry, buffer, "this.", Versions.NONE);
buffer.incrementIndent();
}
buffer.printf("_writable.writeUnsignedVarint(%d);%n", field.tag().get());
if (field.type().isString()) {
buffer.printf("byte[] _stringBytes = _cache.getSerializedValue(this.%s);%n",
field.camelCaseName());
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
buffer.printf("_writable.writeUnsignedVarint(_stringBytes.length + " +
"ByteUtils.sizeOfUnsignedVarint(_stringBytes.length + 1));%n");
buffer.printf("_writable.writeUnsignedVarint(_stringBytes.length + 1);%n");
buffer.printf("_writable.writeByteArray(_stringBytes);%n");
} else if (field.type().isBytes()) {
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
buffer.printf("_writable.writeUnsignedVarint(this.%s.length + " +
"ByteUtils.sizeOfUnsignedVarint(this.%s.length + 1));%n",
field.camelCaseName(), field.camelCaseName());
buffer.printf("_writable.writeUnsignedVarint(this.%s.length + 1);%n",
field.camelCaseName());
buffer.printf("_writable.writeByteArray(this.%s);%n",
field.camelCaseName());
} else if (field.type().isArray()) {
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
buffer.printf("_writable.writeUnsignedVarint(_cache.getArraySizeInBytes(this.%s));%n",
field.camelCaseName());
generateVariableLengthWriter(fieldFlexibleVersions(field),
field.camelCaseName(),
field.type(),
presentAndTaggedVersions,
Versions.NONE,
field.zeroCopy());
} else if (field.type().isStruct()) {
VersionConditional.forVersions(field.nullableVersions(), presentAndTaggedVersions).
ifMember(___ -> {
buffer.printf("_writable.writeUnsignedVarint(this.%s.size(_cache, _version) + 1);%n",
field.camelCaseName());
buffer.printf("_writable.writeUnsignedVarint(1);%n");
}).
ifNotMember(___ -> {
buffer.printf("_writable.writeUnsignedVarint(this.%s.size(_cache, _version));%n",
field.camelCaseName());
}).
generate(buffer);
buffer.printf("%s;%n",
primitiveWriteExpression(field.type(), field.camelCaseName()));
} else if (field.type().isRecords()) {
throw new RuntimeException("Unsupported attempt to declare field `" +
field.name() + "` with `records` type as a tagged field.");
} else {
buffer.printf("_writable.writeUnsignedVarint(%d);%n",
field.type().fixedLength().get());
buffer.printf("%s;%n",
primitiveWriteExpression(field.type(), field.camelCaseName()));
}
if (!field.defaultString().equals("null")) {
buffer.decrementIndent();
buffer.printf("}%n");
}
});
if (!field.defaultString().equals("null")) {
cond.ifNull(() -> {
buffer.printf("_writable.writeUnsignedVarint(%d);%n", field.tag().get());
buffer.printf("_writable.writeUnsignedVarint(1);%n");
buffer.printf("_writable.writeUnsignedVarint(0);%n");
});
}
cond.generate(buffer);
}).
generate(buffer);
prevTag = field.tag().get();
}
if (prevTag < Integer.MAX_VALUE) {
buffer.printf("_rawWriter.writeRawTags(_writable, Integer.MAX_VALUE);%n");
}
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("}%n");
}