public void writeCache()

in driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java [44:151]


    public void writeCache(ByteBuf buf) throws InterruptedException, IOException {
        synchronized (lock) {
            while (true) {
                if (null == cache) {
                    throw new IOException("socket is closed !");
                }

                // source buffer is empty.
                if (!buf.isReadable()) {
                    break;
                }

                // 默认缓存大小不够用时需自动清理或扩充,否则将因缓存空间不足而造成I/O超时假象
                int length = buf.readableBytes();
                int deltaSize = length - cache.writableBytes();
                if (deltaSize > 0) {
                    // 首先避免频繁分配内存(扩容/收缩),其次避免频繁移动内存(清理)
                    if (cache.readerIndex() >= deltaSize) { // 可以清理
                        // 回收已读空间,重置读写指针
                        cache.discardReadBytes();
                        // 恢复自动扩充的过大缓存到默认初始缓存大小,释放空间
                        int oldCapacity = cache.capacity();
                        if (oldCapacity > DEFAULT_MAX_BUFFER_SIZE) { // 尝试收缩
                            int newCapacity = cache.writerIndex();
                            newCapacity = ((newCapacity - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
                            int quarter = (newCapacity >> 2); // 至少留空四分之一
                            quarter = ((quarter - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
                            newCapacity += quarter; // 留空四分之一
                            if (newCapacity < (oldCapacity >> 1)) { // 至少收缩二分之一
                                try {
                                    cache.capacity(newCapacity);
                                    logger.info("shrink cache capacity: {} - {} = {} bytes",
                                        oldCapacity,
                                        oldCapacity - newCapacity,
                                        newCapacity);
                                } catch (OutOfMemoryError ignore) {
                                    maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
                                    logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
                                }
                            }
                        }
                    } else { // 尝试扩容
                        int oldCapacity = cache.capacity();
                        if (oldCapacity < maxDirectBuffer) {
                            int quarter = (oldCapacity >> 2); // 至少扩容四分之一
                            quarter = ((quarter - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
                            deltaSize = ((deltaSize - 1) / quarter + 1) * quarter; // 对齐
                            int newCapacity = oldCapacity + deltaSize;
                            if (newCapacity > maxDirectBuffer) {
                                newCapacity = maxDirectBuffer;
                            }
                            try {
                                cache.capacity(newCapacity);
                                logger.info("expand cache capacity: {} + {} = {} bytes",
                                    oldCapacity,
                                    newCapacity - oldCapacity,
                                    newCapacity);
                            } catch (OutOfDirectMemoryError e) {
                                // failed to allocate 885571168 byte(s) of
                                // direct memory (used: 1002946176, max:
                                // 1888485376)
                                long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
                                if (maxDirectMemory < 0) {
                                    maxDirectMemory = PlatformDependent.maxDirectMemory();
                                }
                                if (maxDirectBuffer > maxDirectMemory) {
                                    maxDirectBuffer = (int) maxDirectMemory;
                                    newCapacity = maxDirectBuffer;
                                    logger.warn("resize maxDirectBuffer: {} bytes", maxDirectBuffer, e);
                                    try {
                                        cache.capacity(newCapacity);
                                        logger.info("expand cache capacity: {} + {} = {} bytes",
                                            oldCapacity,
                                            newCapacity - oldCapacity,
                                            newCapacity);
                                    } catch (OutOfMemoryError ignore) {
                                        maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
                                        logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
                                    }
                                } else {
                                    maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
                                    logger.warn("cache OutOfDirectMemoryError: {} bytes", newCapacity, e);
                                }
                            } catch (OutOfMemoryError ignore) {
                                maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
                                logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
                            }
                        }
                    }
                    deltaSize = length - cache.writableBytes();
                }

                if (deltaSize != length) {
                    // deltaSize <= 0 可全部写入,deltaSize > 0 只能部分写入
                    if (deltaSize <= 0) {
                        cache.writeBytes(buf, length);
                        break;
                    } else {
                        cache.writeBytes(buf, length - deltaSize);
                    }
                }
                // dest buffer is full.
                lock.wait(WAIT_PERIOD);
                // 回收已读空间,重置读写指针
                cache.discardReadBytes();
            }
        }
    }