in java/core/src/java/org/apache/orc/impl/WriterImpl.java [147:242]
public WriterImpl(FileSystem fs,
Path path,
OrcFile.WriterOptions opts) throws IOException {
this.path = path;
this.conf = opts.getConfiguration();
// clone it so that we can annotate it with encryption
this.schema = opts.getSchema().clone();
int numColumns = schema.getMaximumId() + 1;
if (!opts.isEnforceBufferSize()) {
opts.bufferSize(getEstimatedBufferSize(opts.getStripeSize(), numColumns,
opts.getBufferSize()));
}
// Annotate the schema with the column encryption
schema.annotateEncryption(opts.getEncryption(), opts.getMasks());
columnEncryption = new WriterEncryptionVariant[numColumns];
columnMaskDescriptions = new MaskDescriptionImpl[numColumns];
encryption = setupEncryption(opts.getKeyProvider(), schema,
opts.getKeyOverrides());
needKeyFlush = encryption.length > 0;
this.directEncodingColumns = OrcUtils.includeColumns(
opts.getDirectEncodingColumns(), opts.getSchema());
dictionaryKeySizeThreshold =
OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
this.callback = opts.getCallback();
if (callback != null) {
callbackContext = () -> WriterImpl.this;
} else {
callbackContext = null;
}
this.useProlepticGregorian = opts.getProlepticGregorian();
this.writeTimeZone = hasTimestamp(schema);
this.useUTCTimeZone = opts.getUseUTCTimestamp();
this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
// ORC-1362: if isBuildIndex=false, then rowIndexStride will be set to 0.
if (opts.getRowIndexStride() >= 0 && opts.isBuildIndex()) {
this.rowIndexStride = opts.getRowIndexStride();
} else {
this.rowIndexStride = 0;
}
this.buildIndex = rowIndexStride > 0;
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
}
this.writerVersion = opts.getWriterVersion();
this.version = opts.getVersion();
if (version == OrcFile.Version.FUTURE) {
throw new IllegalArgumentException("Can not write in a unknown version.");
} else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) {
LOG.warn("ORC files written in " + version.getName() + " will not be" +
" readable by other versions of the software. It is only for" +
" developer testing.");
}
this.bloomFilterVersion = opts.getBloomFilterVersion();
this.bloomFilterFpp = opts.getBloomFilterFpp();
/* do not write bloom filters for ORC v11 */
if (!buildIndex || version == OrcFile.Version.V_0_11) {
this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
} else {
this.bloomFilterColumns =
OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
}
// ensure that we are able to handle callbacks before we register ourselves
ROWS_PER_CHECK = Math.min(opts.getStripeRowCountValue(),
OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf));
this.stripeRowCount= opts.getStripeRowCountValue();
this.stripeSize = opts.getStripeSize();
memoryLimit = stripeSize;
memoryManager = opts.getMemoryManager();
memoryManager.addWriter(path, stripeSize, this);
// Set up the physical writer
this.physicalWriter = opts.getPhysicalWriter() == null ?
new PhysicalFsWriter(fs, path, opts, encryption) :
opts.getPhysicalWriter();
physicalWriter.writeHeader();
unencryptedOptions = physicalWriter.getStreamOptions();
OutStream.assertBufferSizeValid(unencryptedOptions.getBufferSize());
treeWriter = TreeWriter.Factory.create(schema, null, new StreamFactory());
LOG.debug("ORC writer created for path: {} with stripeSize: {} options: {}",
path, stripeSize, unencryptedOptions);
}