in ui/cypress/support/utils/ProcessingElementTestUtils.ts [30:104]
public static testElement(pipelineElementTest: ProcessorTest) {
const inputFile =
'pipelineElement/' +
pipelineElementTest.dir +
'/' +
pipelineElementTest.inputFile;
const expectedResultFile =
'pipelineElement/' + pipelineElementTest.dir + '/expected.csv';
let formatType;
pipelineElementTest.inputFile.endsWith('.csv')
? (formatType = 'csv')
: (formatType = 'json');
FileManagementUtils.addFile(inputFile);
const dataLakeIndex = pipelineElementTest.name.toLowerCase();
const adapterName = pipelineElementTest.name.toLowerCase();
// Build adapter
const adapterInputBuilder = AdapterBuilder.create('File_Stream')
.setName(adapterName)
.setTimestampProperty('timestamp')
.setFormat(formatType)
.setStartAdapter(false)
.addProtocolInput(
'radio',
'speed',
'fastest_\\(ignore_original_time\\)',
)
.addProtocolInput('radio', 'replayonce', 'yes');
if (formatType === 'csv') {
adapterInputBuilder
.addFormatInput('input', ConnectBtns.csvDelimiter(), ';')
.addFormatInput('checkbox', ConnectBtns.csvHeader(), 'check');
} else if (formatType === 'json') {
adapterInputBuilder.addFormatInput(
'radio',
'json_options-array',
'',
);
}
const adapterInput = adapterInputBuilder.build();
ConnectUtils.addAdapter(adapterInput);
ConnectUtils.startAdapter(adapterInput);
// Build Pipeline
const pipelineInput = PipelineBuilder.create(pipelineElementTest.name)
.addSource(adapterName)
.addSourceType('set')
.addProcessingElement(pipelineElementTest.processor)
.addSink(
PipelineElementBuilder.create('data_lake')
.addInput('input', 'db_measurement', dataLakeIndex)
.build(),
)
.build();
PipelineUtils.addPipeline(pipelineInput);
ConnectUtils.goToConnect();
ConnectBtns.startAdapter().click();
cy.wait(3000);
DataLakeUtils.checkResults(
dataLakeIndex,
'cypress/fixtures/' + expectedResultFile,
pipelineElementTest.processor.ignoreTimestamp,
);
}