public void storeEvent()

in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java [577:771]


  public void storeEvent(PersistableEvent pe, Connection connection) {
    // First populate the main event table
    byte[] basePayload = pe.getBasePayload();
    byte[] spillPayload = pe.getSpillPayload();
    boolean hasSpillPayload = (spillPayload != null);
    String channelName = pe.getChannelName();

    LOGGER.debug("Preparing insert event: " + pe);

    PreparedStatement baseEventStmt = null;
    PreparedStatement spillEventStmt = null;
    PreparedStatement baseHeaderStmt = null;
    PreparedStatement headerNameSpillStmt = null;
    PreparedStatement headerValueSpillStmt = null;
    try {
      baseEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_BASE,
                          Statement.RETURN_GENERATED_KEYS);
      baseEventStmt.setBytes(1, basePayload);
      baseEventStmt.setString(2, channelName);
      baseEventStmt.setBoolean(3, hasSpillPayload);

      int baseEventCount = baseEventStmt.executeUpdate();
      if (baseEventCount != 1) {
        throw new JdbcChannelException("Invalid update count on base "
            + "event insert: " + baseEventCount);
      }
      // Extract event ID and set it
      ResultSet eventIdResult = baseEventStmt.getGeneratedKeys();

      if (!eventIdResult.next()) {
        throw new JdbcChannelException("Unable to retrieive inserted event-id");
      }

      long eventId = eventIdResult.getLong(1);
      pe.setEventId(eventId);

      // Persist the payload spill
      if (hasSpillPayload) {
        spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
        spillEventStmt.setLong(1, eventId);
        spillEventStmt.setBinaryStream(2,
            new ByteArrayInputStream(spillPayload), spillPayload.length);
        int spillEventCount = spillEventStmt.executeUpdate();
        if (spillEventCount != 1) {
          throw new JdbcChannelException("Invalid update count on spill "
              + "event insert: " + spillEventCount);
        }
      }

      // Persist the headers
      List<HeaderEntry> headers = pe.getHeaderEntries();
      if (headers != null && headers.size() > 0) {
        List<HeaderEntry> headerWithNameSpill = new ArrayList<HeaderEntry>();
        List<HeaderEntry> headerWithValueSpill = new ArrayList<HeaderEntry>();

        baseHeaderStmt = connection.prepareStatement(STMT_INSERT_HEADER_BASE,
                                Statement.RETURN_GENERATED_KEYS);
        Iterator<HeaderEntry> it = headers.iterator();
        while (it.hasNext()) {
          HeaderEntry entry = it.next();
          SpillableString name = entry.getName();
          SpillableString value = entry.getValue();
          baseHeaderStmt.setLong(1, eventId);
          baseHeaderStmt.setString(2, name.getBase());
          baseHeaderStmt.setString(3, value.getBase());
          baseHeaderStmt.setBoolean(4, name.hasSpill());
          baseHeaderStmt.setBoolean(5, value.hasSpill());

          int updateCount = baseHeaderStmt.executeUpdate();
          if (updateCount != 1) {
            throw new JdbcChannelException("Unexpected update header count: " + updateCount);
          }
          ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys();
          if (!headerIdResultSet.next()) {
            throw new JdbcChannelException(
                "Unable to retrieve inserted header id");
          }
          long headerId = headerIdResultSet.getLong(1);
          entry.setId(headerId);

          if (name.hasSpill()) {
            headerWithNameSpill.add(entry);
          }

          if (value.hasSpill()) {
            headerWithValueSpill.add(entry);
          }
        }

        // Persist header name spills
        if (headerWithNameSpill.size() > 0) {
          LOGGER.debug("Number of headers with name spill: "
                  + headerWithNameSpill.size());

          headerNameSpillStmt =
              connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL);

          for (HeaderEntry entry : headerWithNameSpill) {
            String nameSpill = entry.getName().getSpill();

            headerNameSpillStmt.setLong(1, entry.getId());
            headerNameSpillStmt.setString(2, nameSpill);
            headerNameSpillStmt.addBatch();
          }

          int[] nameSpillUpdateCount = headerNameSpillStmt.executeBatch();
          if (nameSpillUpdateCount.length != headerWithNameSpill.size()) {
            throw new JdbcChannelException("Unexpected update count for header "
                + "name spills: expected " + headerWithNameSpill.size() + ", "
                + "found " + nameSpillUpdateCount.length);
          }

          for (int i = 0; i < nameSpillUpdateCount.length; i++) {
            if (nameSpillUpdateCount[i] != 1) {
              throw new JdbcChannelException("Unexpected update count for "
                  + "header name spill at position " + i + ", value: "
                  + nameSpillUpdateCount[i]);
            }
          }
        }

        // Persist header value spills
        if (headerWithValueSpill.size() > 0) {
          LOGGER.debug("Number of headers with value spill: "
              + headerWithValueSpill.size());

          headerValueSpillStmt =
              connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);

          for (HeaderEntry entry : headerWithValueSpill) {
            String valueSpill = entry.getValue().getSpill();

            headerValueSpillStmt.setLong(1, entry.getId());
            headerValueSpillStmt.setString(2, valueSpill);
            headerValueSpillStmt.addBatch();
          }

          int[] valueSpillUpdateCount = headerValueSpillStmt.executeBatch();
          if (valueSpillUpdateCount.length != headerWithValueSpill.size()) {
            throw new JdbcChannelException("Unexpected update count for header "
                + "value spills: expected " + headerWithValueSpill.size() + ", "
                + "found " + valueSpillUpdateCount.length);
          }

          for (int i = 0; i < valueSpillUpdateCount.length; i++) {
            if (valueSpillUpdateCount[i] != 1) {
              throw new JdbcChannelException("Unexpected update count for "
                  + "header value spill at position " + i + ", value: "
                  + valueSpillUpdateCount[i]);
            }
          }
        }
      }
    } catch (SQLException ex) {
      throw new JdbcChannelException("Unable to persist event: " + pe, ex);
    } finally {
      if (baseEventStmt != null) {
        try {
          baseEventStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base event statement", ex);
        }
      }
      if (spillEventStmt != null) {
        try {
          spillEventStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close spill event statement", ex);
        }
      }
      if (baseHeaderStmt != null) {
        try {
          baseHeaderStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close base header statement", ex);
        }
      }
      if (headerNameSpillStmt != null) {
        try {
          headerNameSpillStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close header name spill statement", ex);
        }
      }
      if (headerValueSpillStmt != null) {
        try {
          headerValueSpillStmt.close();
        } catch (SQLException ex) {
          LOGGER.error("Unable to close header value spill statement", ex);
        }
      }
    }

    LOGGER.debug("Event persisted: " + pe);
  }