in connector/src/main/scala/com/datastax/spark/connector/writer/WriteConf.scala [184:250]
def fromSparkConf(conf: SparkConf): WriteConf = {
ConfigCheck.checkConfig(conf)
val batchSizeInBytes = conf.getInt(BatchSizeBytesParam.name, BatchSizeBytesParam.default)
val consistencyLevel = DefaultConsistencyLevel.valueOf(
conf.get(ConsistencyLevelParam.name, ConsistencyLevelParam.default.name()))
val batchSizeInRowsStr = conf.get(BatchSizeRowsParam.name, "auto")
val ifNotExists = conf.getBoolean(IfNotExistsParam.name, IfNotExistsParam.default)
val ignoreNulls = conf.getBoolean(IgnoreNullsParam.name, IgnoreNullsParam.default)
val batchSize = {
val Number = "([0-9]+)".r
batchSizeInRowsStr match {
case "auto" => BytesInBatch(batchSizeInBytes)
case Number(x) => RowsInBatch(x.toInt)
case other =>
throw new ConnectorConfigurationException(
s"Invalid value of spark.cassandra.output.batch.size.rows: $other. Number or 'auto' expected")
}
}
val batchBufferSize = conf.getInt(BatchBufferSizeParam.name, BatchBufferSizeParam.default)
val batchGroupingKey = conf.getOption(BatchLevelParam.name)
.map(BatchGroupingKey.apply)
.getOrElse(BatchLevelParam.default)
val parallelismLevel = conf.getInt(ParallelismLevelParam.name, ParallelismLevelParam.default)
val throughputMiBPS = conf.getOption(ThroughputMiBPSParam.name).map(_.toDouble)
val metricsEnabled = conf.getBoolean(TaskMetricsParam.name, TaskMetricsParam.default)
val ttlSeconds = conf.getInt(TTLParam.name, TTLParam.default)
val ttlOption =
if (ttlSeconds == TTLParam.default)
TTLOption.defaultValue
else
TTLOption.constant(ttlSeconds)
val timestampMicros = conf.getLong(TimestampParam.name, TimestampParam.default)
val timestampOption =
if (timestampMicros == TimestampParam.default)
TimestampOption.defaultValue
else
TimestampOption.constant(timestampMicros)
WriteConf(
batchSize = batchSize,
batchGroupingBufferSize = batchBufferSize,
batchGroupingKey = batchGroupingKey,
consistencyLevel = consistencyLevel,
parallelismLevel = parallelismLevel,
throughputMiBPS = throughputMiBPS,
taskMetricsEnabled = metricsEnabled,
ttl = ttlOption,
timestamp = timestampOption,
ignoreNulls = ignoreNulls,
ifNotExists = ifNotExists)
}