in src/main/java/com/aliyun/openservices/aliyun/log/producer/LogProducer.java [90:153]
public LogProducer(ProducerConfig producerConfig) {
this.instanceId = INSTANCE_ID_GENERATOR.getAndIncrement();
this.name = LOG_PRODUCER_PREFIX + this.instanceId;
this.producerHash = Utils.generateProducerHash(this.instanceId);
this.producerConfig = producerConfig;
this.memoryController = new Semaphore(producerConfig.getTotalSizeInBytes());
this.retryQueue = new RetryQueue();
BlockingQueue<ProducerBatch> successQueue = new LinkedBlockingQueue<ProducerBatch>();
BlockingQueue<ProducerBatch> failureQueue = new LinkedBlockingQueue<ProducerBatch>();
this.ioThreadPool = new IOThreadPool(producerConfig.getIoThreadCount(), this.name);
this.timeoutThreadPool =
new ThreadPoolExecutor(
producerConfig.getIoThreadCount(),
producerConfig.getIoThreadCount(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(this.name + TIMEOUT_THREAD_SUFFIX_FORMAT)
.build());
this.clientConfiguration = new ClientConfiguration();
clientConfiguration.setRegion(producerConfig.getRegion());
clientConfiguration.setSignatureVersion(producerConfig.getSignVersion());
this.serviceClient = new TimeoutServiceClient(clientConfiguration, this.timeoutThreadPool);
this.accumulator =
new LogAccumulator(
this.producerHash,
producerConfig,
this.clientPool,
this.memoryController,
this.retryQueue,
successQueue,
failureQueue,
this.ioThreadPool,
this.batchCount);
this.mover =
new Mover(
this.name + MOVER_SUFFIX,
producerConfig,
this.clientPool,
this.accumulator,
this.retryQueue,
successQueue,
failureQueue,
this.ioThreadPool,
this.batchCount);
this.successBatchHandler =
new BatchHandler(
this.name + SUCCESS_BATCH_HANDLER_SUFFIX,
successQueue,
this.batchCount,
this.memoryController);
this.failureBatchHandler =
new BatchHandler(
this.name + FAILURE_BATCH_HANDLER_SUFFIX,
failureQueue,
this.batchCount,
this.memoryController);
this.mover.start();
this.successBatchHandler.start();
this.failureBatchHandler.start();
this.adjuster = new ShardHashAdjuster(producerConfig.getBuckets());
}