in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/SinkOfFlume.java [54:111]
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
try {
transaction = channel.getTransaction();
transaction.begin();
long processedEvents = 0;
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in the channel
break;
}
if (processedEvents == 0) {
result = Status.BACKOFF;
counter.incrementBatchEmptyCount();
} else if (processedEvents < batchSize) {
counter.incrementBatchUnderflowCount();
} else {
counter.incrementBatchCompleteCount();
}
event.getHeaders();
event.getBody();
Map<String, Object> m = new HashMap();
m.put("headers", event.getHeaders());
m.put("body", event.getBody());
records.put(m);
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
LOG.error("Failed to publish events", ex);
counter.incrementEventWriteOrChannelFail(ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
// If the transaction wasn't committed before we got the exception, we
// need to rollback.
transaction.rollback();
} catch (RuntimeException e) {
LOG.error("Transaction rollback failed: " + e.getLocalizedMessage());
LOG.debug("Exception follows.", e);
} finally {
transaction.close();
transaction = null;
}
}
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}