in gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java [124:248]
public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State state)
throws IOException {
this.state = state;
this.branchId = builder.branch;
this.isSpeculativeAttemptSafe = true;
this.isWatermarkCapable = true;
this.baseWriterId = builder.getWriterId();
this.createWriterPool = Executors.newSingleThreadExecutor();
this.closer = Closer.create();
this.writerBuilder = builder;
this.controlMessageHandler = new PartitionDataWriterMessageHandler();
if(builder.schema != null) {
this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
}
long cacheExpiryInterval = this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS, DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
this.writeTimeoutInterval = this.state.getPropAsLong(PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS,
DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS);
// Bound the timeout value to avoid data loss when slow write happening
this.writeTimeoutInterval = Math.min(this.writeTimeoutInterval, cacheExpiryInterval / 3 * 2);
log.debug("PartitionedDataWriter: Setting cache expiry interval to {} seconds", cacheExpiryInterval);
this.partitionWriters = CacheBuilder.newBuilder()
.expireAfterAccess(cacheExpiryInterval, TimeUnit.SECONDS)
.removalListener(new RemovalListener<GenericRecord, DataWriter<D>>() {
@Override
public void onRemoval(RemovalNotification<GenericRecord, DataWriter<D>> notification) {
synchronized (PartitionedDataWriter.this) {
if (notification.getValue() != null) {
try {
DataWriter<D> writer = notification.getValue();
totalRecordsFromEvictedWriters += writer.recordsWritten();
totalBytesFromEvictedWriters += writer.bytesWritten();
writer.close();
} catch (IOException e) {
log.error("Exception {} encountered when closing data writer on cache eviction", e);
//Should propagate the exception to avoid committing/publishing corrupt files.
throw new RuntimeException(e);
}
}
}
}
}).build(new CacheLoader<GenericRecord, DataWriter<D>>() {
@Override
public DataWriter<D> load(final GenericRecord key)
throws Exception {
/* wrap the data writer to allow the option to close the writer on flush */
return new InstrumentedPartitionedDataWriterDecorator<>(
new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() {
@Override
public DataWriter<D> get() {
try {
log.info(String.format("Adding one more writer to loading cache of existing writer "
+ "with size = %d", partitionWriters.size()));
Future<DataWriter<D>> future = createWriterPool.submit(() -> createPartitionWriter(key));
state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1);
return future.get(writeTimeoutInterval, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error creating writer", e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format("Failed to create writer due to timeout. The operation timed out after %s seconds.", writeTimeoutInterval), e);
}
}
}, state), state, key);
}
});
//Schedule a DataWriter cache clean up operation, since LoadingCache may keep the object
// in memory even after it has been evicted from the cache.
if (cacheExpiryInterval < Long.MAX_VALUE) {
this.cacheCleanUpExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("CacheCleanupExecutor")));
this.cacheCleanUpExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
PartitionedDataWriter.this.partitionWriters.cleanUp();
}
}, 0, cacheExpiryInterval, TimeUnit.SECONDS);
}
if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
Preconditions.checkArgument(builder instanceof PartitionAwareDataWriterBuilder, String
.format("%s was specified but the writer %s does not support partitioning.",
ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));
try {
this.shouldPartition = true;
this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils
.invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
builder.getBranches(), builder.getBranch())));
Preconditions
.checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()), String
.format("Writer %s does not support schema from partitioner %s",
builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
} catch (ReflectiveOperationException roe) {
throw new IOException(roe);
}
} else {
this.shouldPartition = false;
// Support configuration to close the DataWriter on flush to allow publishing intermediate results in a task
CloseOnFlushWriterWrapper closeOnFlushWriterWrapper =
new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() {
@Override
public DataWriter<D> get() {
try {
return builder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_"
+ PartitionedDataWriter.this.writerIdSuffix++).build();
} catch (IOException e) {
throw new RuntimeException("Error creating writer", e);
}
}
}, state);
DataWriter<D> dataWriter = (DataWriter)closeOnFlushWriterWrapper.getDecoratedObject();
InstrumentedDataWriterDecorator<D> writer =
this.closer.register(new InstrumentedDataWriterDecorator<>(closeOnFlushWriterWrapper, state));
this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter);
this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter);
this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, writer);
this.partitioner = Optional.absent();
this.builder = Optional.absent();
}
}