public List getOperations()

in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java [255:301]


  public List<Operation> getOperations(Event event) throws FlumeException {
    String raw = new String(event.getBody(), charset);
    Matcher m = pattern.matcher(raw);
    boolean match = false;
    Schema schema = table.getSchema();
    List<Operation> ops = Lists.newArrayList();
    while (m.find()) {
      match = true;
      Operation op;
      switch (operation) {
        case UPSERT:
          op = table.newUpsert();
          break;
        case INSERT:
          op = table.newInsert();
          break;
        default:
          throw new FlumeException(
              String.format("Unrecognized operation type '%s' in getOperations(): " +
                  "this should never happen!", operation));
      }
      PartialRow row = op.getRow();
      for (ColumnSchema col : schema.getColumns()) {
        try {
          coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
        } catch (NumberFormatException e) {
          String msg = String.format(
              "Raw value '%s' couldn't be parsed to type %s for column '%s'",
              raw, col.getType(), col.getName());
          logOrThrow(badColumnValuePolicy, msg, e);
        } catch (IllegalArgumentException e) {
          String msg = String.format(
              "Column '%s' has no matching group in '%s'",
              col.getName(), raw);
          logOrThrow(missingColumnPolicy, msg, e);
        } catch (Exception e) {
          throw new FlumeException("Failed to create Kudu operation", e);
        }
      }
      ops.add(op);
    }
    if (!match) {
      String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
      logOrThrow(unmatchedRowPolicy, msg, null);
    }
    return ops;
  }