in adb3client/src/main/java/com/alibaba/cloud/analyticdb/adb3client/impl/ExecutionPool.java [304:344]
public boolean submit(AbstractAction action) throws AdbClientException {
if (!started.get()) {
throw new AdbClientException(ExceptionCode.ALREADY_CLOSE, "submit fail");
}
Semaphore semaphore = null;
int start = -1;
int end = -1;
if (action instanceof PutAction) {
semaphore = writeSemaphore;
start = 0;
end = Math.min(writeThreadSize, workers.length);
} else {
start = 0;
end = workers.length;
}
//如果有信号量,尝试获取信号量,否则返回submit失败
if (semaphore != null) {
try {
boolean acquire = semaphore.tryAcquire(2000L, TimeUnit.MILLISECONDS);
if (!acquire) {
return false;
}
} catch (InterruptedException e) {
throw new AdbClientException(ExceptionCode.INTERRUPTED, "");
}
action.setSemaphore(semaphore);
}
//尝试提交
for (int i = start; i < end; ++i) {
Worker worker = workers[i];
if (worker.offer(action)) {
return true;
}
}
//提交失败则释放,提交成功Worker会负责释放
if (semaphore != null) {
semaphore.release();
}
return false;
}