in java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java [345:433]
private static void processFile(File inputFile, ExecutorService executor) {
AtomicInteger fileCounter = new AtomicInteger(1);
String fileName = FilenameUtils.getBaseName(inputFile.getName());
String fileAbsolutePath = inputFile.getAbsolutePath();
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(
Files.newInputStream(inputFile.toPath()), StandardCharsets.UTF_8))) {
String line;
long currentChunkSize = 0;
int chunkLines = 0;
int index = 0;
List<String> lineList = new ArrayList<>();
boolean isSingleFile = true;
String headerLine = null;
while ((line = reader.readLine()) != null) {
if (index == 0) {
if (schema.timeColumnIndex == -1) {
LOGGER.error(inputFile.getAbsolutePath() + " not found:" + schema.timeColumn);
cpFile(inputFile.getAbsolutePath(), failedDirectoryStr);
break;
}
String[] csvCloumns = line.split(schema.separator);
if (csvCloumns.length != schema.csvColumns.size()) {
LOGGER.error(
"The number of columns defined in the schema file is not equal to the number of columns in the csv file("
+ inputFile.getAbsolutePath()
+ ").");
cpFile(inputFile.getAbsolutePath(), failedDirectoryStr);
break;
}
}
if (schema.hasHeader && index == 0) {
headerLine = line;
index++;
continue;
}
index++;
byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8);
long lineSize = lineBytes.length;
if (currentChunkSize + lineSize > CHUNK_SIZE_BYTE) {
isSingleFile = false;
if (chunkLines > 0) {
submitChunk(
headerLine,
lineList,
fileCounter.getAndIncrement(),
executor,
fileName,
isSingleFile,
fileAbsolutePath);
lineList = new ArrayList<>();
currentChunkSize = 0;
chunkLines = 0;
} else {
lineList.add(line);
submitChunk(
headerLine,
lineList,
fileCounter.getAndIncrement(),
executor,
fileName,
isSingleFile,
fileAbsolutePath);
lineList = new ArrayList<>();
currentChunkSize = 0;
chunkLines = 0;
}
}
lineList.add(line);
currentChunkSize += lineSize;
chunkLines++;
}
if (lineList.size() > 0) {
submitChunk(
headerLine,
lineList,
fileCounter.getAndIncrement(),
executor,
fileName,
isSingleFile,
fileAbsolutePath);
}
} catch (IOException e) {
LOGGER.error("Error reading file", e);
}
}