public void configure()

in flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java [105:211]


  public void configure(Context context) {

    metaStoreUri = context.getString(Config.HIVE_METASTORE);
    if (metaStoreUri == null) {
      throw new IllegalArgumentException(Config.HIVE_METASTORE + " config setting is not " +
              "specified for sink " + getName());
    }
    if (metaStoreUri.equalsIgnoreCase("null")) { // for testing support
      metaStoreUri = null;
    }
    proxyUser = null; // context.getString("hive.proxyUser"); not supported by hive api yet
    database = context.getString(Config.HIVE_DATABASE);
    if (database == null) {
      throw new IllegalArgumentException(Config.HIVE_DATABASE + " config setting is not " +
            "specified for sink " + getName());
    }
    table = context.getString(Config.HIVE_TABLE);
    if (table == null) {
      throw new IllegalArgumentException(Config.HIVE_TABLE + " config setting is not " +
              "specified for sink " + getName());
    }

    String partitions = context.getString(Config.HIVE_PARTITION);
    if (partitions != null) {
      partitionVals = Arrays.asList(partitions.split(","));
    }


    txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, DEFAULT_TXNSPERBATCH);
    if (txnsPerBatchAsk < 0) {
      LOG.warn(getName() + ". hive.txnsPerBatchAsk must be  positive number. Defaulting to "
              + DEFAULT_TXNSPERBATCH);
      txnsPerBatchAsk = DEFAULT_TXNSPERBATCH;
    }
    batchSize = context.getInteger(Config.BATCH_SIZE, DEFAULT_BATCHSIZE);
    if (batchSize < 0) {
      LOG.warn(getName() + ". batchSize must be  positive number. Defaulting to "
              + DEFAULT_BATCHSIZE);
      batchSize = DEFAULT_BATCHSIZE;
    }
    idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, DEFAULT_IDLETIMEOUT);
    if (idleTimeout < 0) {
      LOG.warn(getName() + ". idleTimeout must be  positive number. Defaulting to "
              + DEFAULT_IDLETIMEOUT);
      idleTimeout = DEFAULT_IDLETIMEOUT;
    }
    callTimeout = context.getInteger(Config.CALL_TIMEOUT, DEFAULT_CALLTIMEOUT);
    if (callTimeout < 0) {
      LOG.warn(getName() + ". callTimeout must be  positive number. Defaulting to "
              + DEFAULT_CALLTIMEOUT);
      callTimeout = DEFAULT_CALLTIMEOUT;
    }

    heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, DEFAULT_HEARTBEATINTERVAL);
    if (heartBeatInterval < 0) {
      LOG.warn(getName() + ". heartBeatInterval must be  positive number. Defaulting to "
              + DEFAULT_HEARTBEATINTERVAL);
      heartBeatInterval = DEFAULT_HEARTBEATINTERVAL;
    }
    maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS,
                                            DEFAULT_MAXOPENCONNECTIONS);
    autoCreatePartitions =  context.getBoolean("autoCreatePartitions", true);

    // Timestamp processing
    useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false);

    String tzName = context.getString(Config.TIME_ZONE);
    timeZone = (tzName == null) ? null : TimeZone.getTimeZone(tzName);
    needRounding = context.getBoolean(Config.ROUND, false);

    String unit = context.getString(Config.ROUND_UNIT, Config.MINUTE);
    if (unit.equalsIgnoreCase(Config.HOUR)) {
      this.roundUnit = Calendar.HOUR_OF_DAY;
    } else if (unit.equalsIgnoreCase(Config.MINUTE)) {
      this.roundUnit = Calendar.MINUTE;
    } else if (unit.equalsIgnoreCase(Config.SECOND)) {
      this.roundUnit = Calendar.SECOND;
    } else {
      LOG.warn(getName() + ". Rounding unit is not valid, please set one of " +
              "minute, hour or second. Rounding will be disabled");
      needRounding = false;
    }
    this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
    if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
      Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
              "Round value must be > 0 and <= 60");
    } else if (roundUnit == Calendar.HOUR_OF_DAY) {
      Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
              "Round value must be > 0 and <= 24");
    }

    // Serializer
    serializerType = context.getString(Config.SERIALIZER, "");
    if (serializerType.isEmpty()) {
      throw new IllegalArgumentException("serializer config setting is not " +
              "specified for sink " + getName());
    }

    serializer = createSerializer(serializerType);
    serializer.configure(context);

    Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");

    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }
  }