private int drainOneBatch()

in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [279:330]


  private int drainOneBatch(Channel channel)
          throws HiveWriter.Failure, InterruptedException {
    int txnEventCount = 0;
    try {
      Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
      for (; txnEventCount < batchSize; ++txnEventCount) {
        // 0) Read event from Channel
        Event event = channel.take();
        if (event == null) {
          break;
        }

        //1) Create end point by substituting place holders
        HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
                partitionVals, event.getHeaders(), timeZone,
                needRounding, roundUnit, roundValue, useLocalTime);

        //2) Create or reuse Writer
        HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);

        //3) Write
        LOG.debug("{} : Writing event to {}", getName(), endPoint);
        writer.write(event);

      } // for

      //4) Update counters
      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
      sinkCounter.addToEventDrainAttemptCount(txnEventCount);


      // 5) Flush all Writers
      for (HiveWriter writer : activeWriters.values()) {
        writer.flush(true);
      }

      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
      return txnEventCount;
    } catch (HiveWriter.Failure e) {
      // in case of error we close all TxnBatches to start clean next time
      LOG.warn(getName() + " : " + e.getMessage(), e);
      abortAllWriters();
      closeAllWriters();
      throw e;
    }
  }