private void flush()

in odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/impl/UpsertStreamImpl.java [285:377]


  private void flush(boolean flushAll) throws TunnelException, IOException {
    List<FlushResultHandler> handlers = new ArrayList<>();
    boolean success;
    int retry = 0;

    // update slot map
    Map<Integer, Slot> bucketMap = session.getBuckets();
    if (bucketMap.size() != buckets.size()) {
      throw new TunnelException("session slot map is changed");
    } else {
      buckets = bucketMap;
    }

    do {
      success = true;
      handlers.clear();
      Channel channel = null;
      try {
        checkStatus();
        latch = new CountDownLatch(bucketBuffer.size());
        for (Map.Entry<Integer, ProtobufRecordPack> entry : bucketBuffer.entrySet()) {
          ProtobufRecordPack pack = entry.getValue();
          if (pack.getSize() > 0) {
            if (pack.getTotalBytes() > slotBufferSize || flushAll) {
              int bucketId = entry.getKey();
              long bytes = pack.getTotalBytes();
              pack.checkTransConsistency(false);
              pack.complete();
              bytes = pack.getTotalBytes() - bytes;
              if (!flushAll) {
                totalBufferSize += bytes;
              }
              Request request = session.buildRequest("PUT", bucketId, buckets.get(bucketId), pack.getTotalBytes(), pack.getSize(), compressOption);
              channel = channelPool.acquire();
              FlushResultHandler handler = new FlushResultHandler(pack, latch, listener, retry, bucketId);
              channel.pipeline().addLast(handler);
              handlers.add(handler);
              ChannelFuture
                  channelFuture =
                  channel.writeAndFlush(buildFullHttpRequest(request, pack.getProtobufStream()));
              channelFuture.addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                  latch.countDown();
                  channelPool.release(future.channel());
                  handler.setException(
                      new TunnelException("Connect : " + future.cause().getMessage(),
                                          future.cause()));
                  future.channel().close();
                } else {
                  future.channel().pipeline().addFirst(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
                }
              });
            } else {
              latch.countDown();
            }
          } else {
            latch.countDown();
          }
        }
        latch.await();
      } catch (InterruptedException e) {
        throw new TunnelException("flush interrupted", e);
      }

      for (FlushResultHandler handler : handlers) {
        if (handler.getException() != null) {
          success = false;
          if (listener != null) {
            if (!listener.onFlushFail(handler.getException(), retry)) {
              status = Status.ERROR;
              TunnelException e = new TunnelException(handler.getException().getErrorMsg(), handler.getException());
              e.setRequestId(handler.getException().getRequestId());
              e.setErrorCode(handler.getException().getErrorCode());
              throw e;
            }
          } else {
            TunnelException e = new TunnelException(handler.getException().getErrorMsg(), handler.getException());
            e.setRequestId(handler.getException().getRequestId());
            e.setErrorCode(handler.getException().getErrorCode());
            throw e;
          }
        } else {
          if (!flushAll) {
            totalBufferSize -= handler.getFlushResult().flushSize;
          }
        }
      }
      ++retry;
    } while (!success);
    if (flushAll) {
      totalBufferSize = 0;
    }
  }