in holo-client/src/main/java/com/alibaba/hologres/client/HoloClient.java [659:735]
public ImportContext importData(Importer importer) throws HoloClientException {
ensurePoolOpen();
tryThrowException();
int threadSize = importer.getThreadSize();
int maxThread = this.config.writeThreadSize > 1 ? this.config.writeThreadSize - 1 : 1;
if (threadSize > maxThread) {
LOGGER.warn("Thread size is larger than max write thread size of holo client, will be using {}", maxThread);
threadSize = maxThread;
}
int shardCount = Command.getShardCount(this, importer.getSchema());
int startShard = importer.getStartShardId() == -1 ? 0 : importer.getStartShardId();
int endShard = importer.getEndShardId() == -1 ? shardCount : importer.getEndShardId();
if (threadSize > (endShard - startShard)) {
threadSize = endShard - startShard;
LOGGER.warn("Thread size is larger than shard count, will be using thread size {}", threadSize);
}
InputStream is = importer.getInputStream();
InputStream[] istreams = new InputStream[threadSize];
OutputStream[] ostreams = new OutputStream[threadSize];
if (is == null) {
for (int t = 0; t < threadSize; t++) {
PipedInputStream istream = new PipedInputStream(importer.getBufferSize() > 0 ? importer.getBufferSize() : 1024);
PipedOutputStream ostream = new PipedOutputStream();
istreams[t] = istream;
ostreams[t] = ostream;
try {
ostream.connect(istream);
} catch (IOException e) {
throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "should not happen", e);
}
}
} else {
istreams[0] = is;
}
int shardSize = (endShard - startShard) / threadSize;
int remain = (endShard - startShard) % threadSize;
CopyAction[] actions = new CopyAction[threadSize];
NavigableMap<Integer, Integer> shardMap = new TreeMap<>();
for (int t = 0; t < threadSize; t++) {
int end;
if (remain > 0) {
end = startShard + shardSize + 1;
remain--;
} else {
end = startShard + shardSize;
}
CopyAction action = new CopyAction(importer.getSchema(), null, istreams[t], startShard, end, CopyAction.Mode.IN);
action.setBufferSize(importer.getBufferSize());
shardMap.put(startShard, t);
startShard = end;
actions[t] = action;
while (!pool.submit(action)) {
}
}
try {
CopyContext[] copyContexts = new CopyContext[threadSize];
CompletableFuture<Long>[] futures = new CompletableFuture[threadSize];
for (int t = 0; t < threadSize; t++) {
copyContexts[t] = actions[t].getReadyToStart().get();
futures[t] = actions[t].getFuture();
}
return new ImportContext(shardMap, futures, copyContexts, ostreams, shardCount);
} catch (InterruptedException e) {
throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "interrupt", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof HoloClientException) {
throw (HoloClientException) cause;
} else {
throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "", cause);
}
}
}