in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [408:537]
private void generateClassReader(String className, StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.READABLE_CLASS);
buffer.printf("@Override%n");
buffer.printf("public final void read(Readable _readable, short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(parentVersions, struct.versions()).
allowMembershipCheckAlwaysFalse(false).
ifNotMember(__ -> {
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Can't read " +
"version \" + _version + \" of %s\");%n", className);
}).
generate(buffer);
Versions curVersions = parentVersions.intersect(struct.versions());
for (FieldSpec field : struct.fields()) {
Versions fieldFlexibleVersions = fieldFlexibleVersions(field);
if (!field.taggedVersions().intersect(fieldFlexibleVersions).equals(field.taggedVersions())) {
throw new RuntimeException("Field " + field.name() + " specifies tagged " +
"versions " + field.taggedVersions() + " that are not a subset of the " +
"flexible versions " + fieldFlexibleVersions);
}
Versions mandatoryVersions = field.versions().subtract(field.taggedVersions());
VersionConditional.forVersions(mandatoryVersions, curVersions).
alwaysEmitBlockScope(field.type().isVariableLength()).
ifNotMember(__ -> {
// If the field is not present, or is tagged, set it to its default here.
buffer.printf("this.%s = %s;%n", field.camelCaseName(),
field.fieldDefault(headerGenerator, structRegistry));
}).
ifMember(presentAndUntaggedVersions -> {
if (field.type().isVariableLength() && !field.type().isStruct()) {
ClauseGenerator callGenerateVariableLengthReader = versions -> {
generateVariableLengthReader(fieldFlexibleVersions(field),
field.camelCaseName(),
field.type(),
versions,
field.nullableVersions(),
String.format("this.%s = ", field.camelCaseName()),
String.format(";%n"),
structRegistry.isStructArrayWithKeys(field),
field.zeroCopy());
};
// For arrays where the field type needs to be serialized differently in flexible
// versions, lift the flexible version check outside of the array.
// This may mean generating two separate 'for' loops-- one for flexible
// versions, and one for regular versions.
if (field.type().isArray() &&
((FieldType.ArrayType) field.type()).elementType().
serializationIsDifferentInFlexibleVersions()) {
VersionConditional.forVersions(fieldFlexibleVersions(field),
presentAndUntaggedVersions).
ifMember(callGenerateVariableLengthReader).
ifNotMember(callGenerateVariableLengthReader).
generate(buffer);
} else {
callGenerateVariableLengthReader.generate(presentAndUntaggedVersions);
}
} else if (field.type().isStruct()) {
generateStructReader(field, presentAndUntaggedVersions, false);
} else {
buffer.printf("this.%s = %s;%n", field.camelCaseName(),
primitiveReadExpression(field.type()));
}
}).
generate(buffer);
}
buffer.printf("this._unknownTaggedFields = null;%n");
VersionConditional.forVersions(messageFlexibleVersions, curVersions).
ifMember(curFlexibleVersions -> {
buffer.printf("int _numTaggedFields = _readable.readUnsignedVarint();%n");
buffer.printf("for (int _i = 0; _i < _numTaggedFields; _i++) {%n");
buffer.incrementIndent();
buffer.printf("int _tag = _readable.readUnsignedVarint();%n");
buffer.printf("int _size = _readable.readUnsignedVarint();%n");
buffer.printf("switch (_tag) {%n");
buffer.incrementIndent();
for (FieldSpec field : struct.fields()) {
Versions validTaggedVersions = field.versions().intersect(field.taggedVersions());
if (!validTaggedVersions.empty()) {
if (field.tag().isEmpty()) {
throw new RuntimeException("Field " + field.name() + " has tagged versions, but no tag.");
}
buffer.printf("case %d: {%n", field.tag().get());
buffer.incrementIndent();
VersionConditional.forVersions(validTaggedVersions, curFlexibleVersions).
ifMember(presentAndTaggedVersions -> {
if (field.type().isVariableLength() && !field.type().isStruct()) {
// All tagged fields are serialized using the new-style
// flexible versions serialization.
generateVariableLengthReader(fieldFlexibleVersions(field),
field.camelCaseName(),
field.type(),
presentAndTaggedVersions,
field.nullableVersions(),
String.format("this.%s = ", field.camelCaseName()),
String.format(";%n"),
structRegistry.isStructArrayWithKeys(field),
field.zeroCopy());
} else if (field.type().isStruct()) {
generateStructReader(field, presentAndTaggedVersions, true);
} else {
buffer.printf("this.%s = %s;%n", field.camelCaseName(),
primitiveReadExpression(field.type()));
}
buffer.printf("break;%n");
}).
ifNotMember(__ -> {
buffer.printf("throw new RuntimeException(\"Tag %d is not " +
"valid for version \" + _version);%n", field.tag().get());
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("}%n");
}
}
buffer.printf("default:%n");
buffer.incrementIndent();
buffer.printf("this._unknownTaggedFields = _readable.readUnknownTaggedField(this._unknownTaggedFields, _tag, _size);%n");
buffer.printf("break;%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}).
generate(buffer);
buffer.decrementIndent();
buffer.printf("}%n");
}