in src/main/java/com/amazonaws/mskdatagen/producer/GenerateSourceRecord.java [28:58]
public List<SourceRecord> generateSourceRecord() {
new ApplyGenerator(context).advanceUntilSuccess();
GeneratedConfig generated = context.getGenerated();
if (generated.isSuccess()) {
List<SourceRecord> records = new ArrayList<>();
String topic = generated.getTopic();
Map<List<String>, String> keyResults = generated.getKeyResults();
Pair<Schema, Object> keySchemaValueResultBuilder = SchemaValueResultBuilder.create(keyResults)
.build();
Map<List<String>, String> valueResults = generated.getValResults();
Pair<Schema, Object> valueSchemaValueResultBuilder = SchemaValueResultBuilder.create(valueResults)
.build();
records
.add(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), topic, null,
keySchemaValueResultBuilder.getKey(), keySchemaValueResultBuilder.getValue(),
valueSchemaValueResultBuilder.getKey(), valueSchemaValueResultBuilder.getValue()));
return records;
} else if ("drained".equals(generated.getState())) {
log.debug("Generation result is empty.");
return Collections.emptyList();
}
log.error("State machine returned an unusable status: {}", generated.getState());
throw new IllegalStateException("State machine returned an unusable status: " + generated.getState());
}