public PersistableEvent fetchAndDeleteEvent()

in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java [774:1053]


  public PersistableEvent fetchAndDeleteEvent(String channel,
      Connection connection) {
    PersistableEvent.Builder peBuilder = null;
    PreparedStatement baseEventFetchStmt = null;
    PreparedStatement spillEventFetchStmt = null;
    InputStream payloadInputStream = null;
    PreparedStatement baseHeaderFetchStmt = null;
    PreparedStatement nameSpillHeaderStmt = null;
    PreparedStatement valueSpillHeaderStmt = null;
    PreparedStatement deleteSpillEventStmt = null;
    PreparedStatement deleteNameSpillHeaderStmt = null;
    PreparedStatement deleteValueSpillHeaderStmt = null;
    PreparedStatement deleteBaseHeaderStmt = null;
    PreparedStatement deleteBaseEventStmt = null;
    try {
      baseEventFetchStmt = connection.prepareStatement(STMT_FETCH_PAYLOAD_BASE);
      baseEventFetchStmt.setString(1, channel);
      ResultSet rsetBaseEvent = baseEventFetchStmt.executeQuery();

      if (!rsetBaseEvent.next()) {
        // Empty result set
        LOGGER.debug("No events found for channel: " + channel);
        return null;
      }

      // Populate event id, payload
      long eventId = rsetBaseEvent.getLong(1);
      peBuilder = new PersistableEvent.Builder(channel, eventId);
      peBuilder.setBasePayload(rsetBaseEvent.getBytes(2));
      boolean hasSpill = rsetBaseEvent.getBoolean(3);

      if (hasSpill) {
        spillEventFetchStmt =
            connection.prepareStatement(STMT_FETCH_PAYLOAD_SPILL);

        spillEventFetchStmt.setLong(1, eventId);
        ResultSet rsetSpillEvent = spillEventFetchStmt.executeQuery();
        if (!rsetSpillEvent.next()) {
          throw new JdbcChannelException("Payload spill expected but not "
              + "found for event: " + eventId);
        }
        Blob payloadSpillBlob = rsetSpillEvent.getBlob(1);
        payloadInputStream = payloadSpillBlob.getBinaryStream();
        ByteArrayOutputStream spillStream = new ByteArrayOutputStream();
        byte[] buffer = new byte[1024];
        int length = 0;
        while ((length = payloadInputStream.read(buffer)) != -1) {
          spillStream.write(buffer, 0, length);
        }
        peBuilder.setSpillPayload(spillStream.toByteArray());

        // Delete this spill
        deleteSpillEventStmt =
            connection.prepareStatement(STMT_DELETE_EVENT_SPILL);
        deleteSpillEventStmt.setLong(1, eventId);

        int updateCount = deleteSpillEventStmt.executeUpdate();
        if (updateCount != 1) {
          throw new JdbcChannelException("Unexpected row count for spill "
              + "delete: " + updateCount);
        }
      }

      if (rsetBaseEvent.next()) {
        throw new JdbcChannelException("More than expected events retrieved");
      }

      // Populate headers
      List<Long> nameSpillHeaders = null;
      List<Long> valueSpillHeaders = null;
      baseHeaderFetchStmt = connection.prepareStatement(STMT_FETCH_HEADER_BASE);
      baseHeaderFetchStmt.setLong(1, eventId);
      int headerCount = 0; // for later delete validation

      ResultSet rsetBaseHeader = baseHeaderFetchStmt.executeQuery();
      while (rsetBaseHeader.next()) {
        headerCount++;
        long headerId = rsetBaseHeader.getLong(1);
        String baseName = rsetBaseHeader.getString(2);
        String baseValue = rsetBaseHeader.getString(3);
        boolean hasNameSpill = rsetBaseHeader.getBoolean(4);
        boolean hasValueSpill = rsetBaseHeader.getBoolean(5);

        peBuilder.setHeader(headerId, baseName, baseValue);
        if (hasNameSpill) {
          if (nameSpillHeaders == null) {
            nameSpillHeaders = new ArrayList<Long>();
          }
          nameSpillHeaders.add(headerId);
        }

        if (hasValueSpill) {
          if (valueSpillHeaders == null) {
            valueSpillHeaders = new ArrayList<Long>();
          }
          valueSpillHeaders.add(headerId);
        }
      }

      if (nameSpillHeaders != null) {

        nameSpillHeaderStmt =
            connection.prepareStatement(STMT_FETCH_HEADER_NAME_SPILL);

        deleteNameSpillHeaderStmt =
            connection.prepareStatement(STMT_DELETE_HEADER_NAME_SPILL);
        for (long headerId : nameSpillHeaders) {
          nameSpillHeaderStmt.setLong(1, headerId);
          ResultSet rsetHeaderNameSpill = nameSpillHeaderStmt.executeQuery();
          if (!rsetHeaderNameSpill.next()) {
            throw new JdbcChannelException("Name spill was set for header "
                + headerId + " but was not found");
          }
          String nameSpill = rsetHeaderNameSpill.getString(1);

          peBuilder.setHeaderNameSpill(headerId, nameSpill);
          deleteNameSpillHeaderStmt.setLong(1, headerId);
          deleteNameSpillHeaderStmt.addBatch();
        }

        // Delete header name spills
        int[] headerNameSpillDelete = deleteNameSpillHeaderStmt.executeBatch();
        if (headerNameSpillDelete.length != nameSpillHeaders.size()) {
          throw new JdbcChannelException("Unexpected number of header name "
              + "spill deletes: expected " + nameSpillHeaders.size()
              + ", found: " + headerNameSpillDelete.length);
        }

        for (int numRowsAffected : headerNameSpillDelete) {
          if (numRowsAffected != 1) {
            throw new JdbcChannelException("Unexpected number of deleted rows "
                + "for header name spill deletes: " + numRowsAffected);
          }
        }
      }

      if (valueSpillHeaders != null) {
        valueSpillHeaderStmt =
            connection.prepareStatement(STMT_FETCH_HEADER_VALUE_SPILL);

        deleteValueSpillHeaderStmt =
            connection.prepareStatement(STMT_DELETE_HEADER_VALUE_SPILL);
        for (long headerId: valueSpillHeaders) {
          valueSpillHeaderStmt.setLong(1, headerId);
          ResultSet rsetHeaderValueSpill = valueSpillHeaderStmt.executeQuery();
          if (!rsetHeaderValueSpill.next()) {
            throw new JdbcChannelException("Value spill was set for header "
                + headerId + " but was not found");
          }
          String valueSpill = rsetHeaderValueSpill.getString(1);

          peBuilder.setHeaderValueSpill(headerId, valueSpill);
          deleteValueSpillHeaderStmt.setLong(1, headerId);
          deleteValueSpillHeaderStmt.addBatch();
        }
        // Delete header value spills
        int[] headerValueSpillDelete = deleteValueSpillHeaderStmt.executeBatch();
        if (headerValueSpillDelete.length != valueSpillHeaders.size()) {
          throw new JdbcChannelException("Unexpected number of header value "
              + "spill deletes: expected " + valueSpillHeaders.size()
              + ", found: " + headerValueSpillDelete.length);
        }

        for (int numRowsAffected : headerValueSpillDelete) {
          if (numRowsAffected != 1) {
            throw new JdbcChannelException("Unexpected number of deleted rows "
                + "for header value spill deletes: " + numRowsAffected);
          }
        }
      }

      // Now delete Headers
      if (headerCount > 0) {
        deleteBaseHeaderStmt =
            connection.prepareStatement(STMT_DELETE_HEADER_BASE);
        deleteBaseHeaderStmt.setLong(1, eventId);

        int rowCount = deleteBaseHeaderStmt.executeUpdate();
        if (rowCount != headerCount) {
          throw new JdbcChannelException("Unexpected base header delete count: "
              + "expected: " + headerCount + ", found: " + rowCount);
        }
      }

      // Now delete the Event
      deleteBaseEventStmt = connection.prepareStatement(STMT_DELETE_EVENT_BASE);
      deleteBaseEventStmt.setLong(1, eventId);
      int rowCount = deleteBaseEventStmt.executeUpdate();

      if (rowCount != 1) {
        throw new JdbcChannelException("Unexpected row count for delete of "
            + "event-id: " + eventId + ", count: " + rowCount);
      }

    } catch (SQLException ex) {
      throw new JdbcChannelException("Unable to retrieve event", ex);
    } catch (IOException ex) {
      throw new JdbcChannelException("Unable to read data", ex);
    } finally {
      if (payloadInputStream != null) {
        try {
          payloadInputStream.close();
        } catch (IOException ex) {
          LOGGER.error("Unable to close payload spill stream", ex);
        }
      }
      if (baseEventFetchStmt != null) {
        try {
          baseEventFetchStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base event fetch statement", ex);
        }
      }
      if (spillEventFetchStmt != null) {
        try {
          spillEventFetchStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close spill event fetch statment", ex);
        }
      }
      if (deleteSpillEventStmt != null) {
        try {
          deleteSpillEventStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close event spill delete statement", ex);
        }
      }
      if (baseHeaderFetchStmt != null) {
        try {
          baseHeaderFetchStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base header fetch statement", ex);
        }
      }
      if (nameSpillHeaderStmt != null) {
        try {
          nameSpillHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close name spill fetch statement", ex);
        }
      }
      if (valueSpillHeaderStmt != null) {
        try {
          valueSpillHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close value spill fetch statement", ex);
        }
      }
      if (deleteNameSpillHeaderStmt != null) {
        try {
          deleteNameSpillHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close value spill delete statement", ex);
        }
      }
      if (deleteValueSpillHeaderStmt != null) {
        try {
          deleteValueSpillHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close value spill delete statement", ex);
        }
      }
      if (deleteBaseHeaderStmt != null) {
        try {
          deleteBaseHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base header delete statement", ex);
        }
      }
      if (deleteBaseEventStmt != null) {
        try {
          deleteBaseEventStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base event delete statement", ex);
        }
      }
    }

    return peBuilder.build();
  }