in driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java [44:151]
public void writeCache(ByteBuf buf) throws InterruptedException, IOException {
synchronized (lock) {
while (true) {
if (null == cache) {
throw new IOException("socket is closed !");
}
// source buffer is empty.
if (!buf.isReadable()) {
break;
}
// 默认缓存大小不够用时需自动清理或扩充,否则将因缓存空间不足而造成I/O超时假象
int length = buf.readableBytes();
int deltaSize = length - cache.writableBytes();
if (deltaSize > 0) {
// 首先避免频繁分配内存(扩容/收缩),其次避免频繁移动内存(清理)
if (cache.readerIndex() >= deltaSize) { // 可以清理
// 回收已读空间,重置读写指针
cache.discardReadBytes();
// 恢复自动扩充的过大缓存到默认初始缓存大小,释放空间
int oldCapacity = cache.capacity();
if (oldCapacity > DEFAULT_MAX_BUFFER_SIZE) { // 尝试收缩
int newCapacity = cache.writerIndex();
newCapacity = ((newCapacity - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
int quarter = (newCapacity >> 2); // 至少留空四分之一
quarter = ((quarter - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
newCapacity += quarter; // 留空四分之一
if (newCapacity < (oldCapacity >> 1)) { // 至少收缩二分之一
try {
cache.capacity(newCapacity);
logger.info("shrink cache capacity: {} - {} = {} bytes",
oldCapacity,
oldCapacity - newCapacity,
newCapacity);
} catch (OutOfMemoryError ignore) {
maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
}
}
}
} else { // 尝试扩容
int oldCapacity = cache.capacity();
if (oldCapacity < maxDirectBuffer) {
int quarter = (oldCapacity >> 2); // 至少扩容四分之一
quarter = ((quarter - 1) / DEFAULT_INIT_BUFFER_SIZE + 1) * DEFAULT_INIT_BUFFER_SIZE; // 对齐
deltaSize = ((deltaSize - 1) / quarter + 1) * quarter; // 对齐
int newCapacity = oldCapacity + deltaSize;
if (newCapacity > maxDirectBuffer) {
newCapacity = maxDirectBuffer;
}
try {
cache.capacity(newCapacity);
logger.info("expand cache capacity: {} + {} = {} bytes",
oldCapacity,
newCapacity - oldCapacity,
newCapacity);
} catch (OutOfDirectMemoryError e) {
// failed to allocate 885571168 byte(s) of
// direct memory (used: 1002946176, max:
// 1888485376)
long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
if (maxDirectMemory < 0) {
maxDirectMemory = PlatformDependent.maxDirectMemory();
}
if (maxDirectBuffer > maxDirectMemory) {
maxDirectBuffer = (int) maxDirectMemory;
newCapacity = maxDirectBuffer;
logger.warn("resize maxDirectBuffer: {} bytes", maxDirectBuffer, e);
try {
cache.capacity(newCapacity);
logger.info("expand cache capacity: {} + {} = {} bytes",
oldCapacity,
newCapacity - oldCapacity,
newCapacity);
} catch (OutOfMemoryError ignore) {
maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
}
} else {
maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
logger.warn("cache OutOfDirectMemoryError: {} bytes", newCapacity, e);
}
} catch (OutOfMemoryError ignore) {
maxDirectBuffer = oldCapacity; // 未来不再超过当前容量,记录日志后继续
logger.warn("cache OutOfMemoryError: {} bytes", newCapacity, ignore);
}
}
}
deltaSize = length - cache.writableBytes();
}
if (deltaSize != length) {
// deltaSize <= 0 可全部写入,deltaSize > 0 只能部分写入
if (deltaSize <= 0) {
cache.writeBytes(buf, length);
break;
} else {
cache.writeBytes(buf, length - deltaSize);
}
}
// dest buffer is full.
lock.wait(WAIT_PERIOD);
// 回收已读空间,重置读写指针
cache.discardReadBytes();
}
}
}