private void setupOp()

in flume-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java [197:252]


  private void setupOp(Operation op, GenericRecord record) {
    PartialRow row = op.getRow();
    for (ColumnSchema col : table.getSchema().getColumns()) {
      String name = col.getName();
      Object value = record.get(name);
      if (value == null) {
        // Set null if nullable, otherwise leave unset for possible Kudu default.
        if (col.isNullable()) {
          row.setNull(name);
        }
      } else {
        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
        // a larger type.
        try {
          switch (col.getType()) {
            case BOOL:
              row.addBoolean(name, (boolean) value);
              break;
            case INT8:
              row.addByte(name, (byte) value);
              break;
            case INT16:
              row.addShort(name, (short) value);
              break;
            case INT32:
              row.addInt(name, (int) value);
              break;
            case INT64: // Fall through
            case UNIXTIME_MICROS:
              row.addLong(name, (long) value);
              break;
            case FLOAT:
              row.addFloat(name, (float) value);
              break;
            case DOUBLE:
              row.addDouble(name, (double) value);
              break;
            case STRING:
              row.addString(name, value.toString());
              break;
            case BINARY:
              row.addBinary(name, (byte[]) value);
              break;
            default:
              throw new FlumeException(String.format(
                  "Unrecognized type %s for column %s", col.getType().toString(), name));
          }
        } catch (ClassCastException e) {
          throw new FlumeException(
              String.format("Failed to coerce value for column '%s' to type %s",
              col.getName(),
              col.getType()), e);
        }
      }
    }
  }