public LogProducer()

in src/main/java/com/aliyun/openservices/aliyun/log/producer/LogProducer.java [90:153]


  public LogProducer(ProducerConfig producerConfig) {
    this.instanceId = INSTANCE_ID_GENERATOR.getAndIncrement();
    this.name = LOG_PRODUCER_PREFIX + this.instanceId;
    this.producerHash = Utils.generateProducerHash(this.instanceId);
    this.producerConfig = producerConfig;
    this.memoryController = new Semaphore(producerConfig.getTotalSizeInBytes());
    this.retryQueue = new RetryQueue();
    BlockingQueue<ProducerBatch> successQueue = new LinkedBlockingQueue<ProducerBatch>();
    BlockingQueue<ProducerBatch> failureQueue = new LinkedBlockingQueue<ProducerBatch>();
    this.ioThreadPool = new IOThreadPool(producerConfig.getIoThreadCount(), this.name);
    this.timeoutThreadPool =
        new ThreadPoolExecutor(
            producerConfig.getIoThreadCount(),
            producerConfig.getIoThreadCount(),
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat(this.name + TIMEOUT_THREAD_SUFFIX_FORMAT)
                .build());
    this.clientConfiguration = new ClientConfiguration();
    clientConfiguration.setRegion(producerConfig.getRegion());
    clientConfiguration.setSignatureVersion(producerConfig.getSignVersion());
    this.serviceClient = new TimeoutServiceClient(clientConfiguration, this.timeoutThreadPool);
    this.accumulator =
        new LogAccumulator(
            this.producerHash,
            producerConfig,
            this.clientPool,
            this.memoryController,
            this.retryQueue,
            successQueue,
            failureQueue,
            this.ioThreadPool,
            this.batchCount);
    this.mover =
        new Mover(
            this.name + MOVER_SUFFIX,
            producerConfig,
            this.clientPool,
            this.accumulator,
            this.retryQueue,
            successQueue,
            failureQueue,
            this.ioThreadPool,
            this.batchCount);
    this.successBatchHandler =
        new BatchHandler(
            this.name + SUCCESS_BATCH_HANDLER_SUFFIX,
            successQueue,
            this.batchCount,
            this.memoryController);
    this.failureBatchHandler =
        new BatchHandler(
            this.name + FAILURE_BATCH_HANDLER_SUFFIX,
            failureQueue,
            this.batchCount,
            this.memoryController);
    this.mover.start();
    this.successBatchHandler.start();
    this.failureBatchHandler.start();
    this.adjuster = new ShardHashAdjuster(producerConfig.getBuckets());
  }