public PartitionedDataWriter()

in gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java [124:248]


  public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State state)
      throws IOException {
    this.state = state;
    this.branchId = builder.branch;

    this.isSpeculativeAttemptSafe = true;
    this.isWatermarkCapable = true;
    this.baseWriterId = builder.getWriterId();
    this.createWriterPool = Executors.newSingleThreadExecutor();
    this.closer = Closer.create();
    this.writerBuilder = builder;
    this.controlMessageHandler = new PartitionDataWriterMessageHandler();
    if(builder.schema != null) {
      this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
    }
    long cacheExpiryInterval = this.state.getPropAsLong(PARTITIONED_WRITER_CACHE_TTL_SECONDS, DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS);
    this.writeTimeoutInterval = this.state.getPropAsLong(PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS,
        DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS);
    // Bound the timeout value to avoid data loss when slow write happening
    this.writeTimeoutInterval = Math.min(this.writeTimeoutInterval, cacheExpiryInterval / 3 * 2);
    log.debug("PartitionedDataWriter: Setting cache expiry interval to {} seconds", cacheExpiryInterval);

    this.partitionWriters = CacheBuilder.newBuilder()
        .expireAfterAccess(cacheExpiryInterval, TimeUnit.SECONDS)
        .removalListener(new RemovalListener<GenericRecord, DataWriter<D>>() {
      @Override
      public void onRemoval(RemovalNotification<GenericRecord, DataWriter<D>> notification) {
        synchronized (PartitionedDataWriter.this) {
          if (notification.getValue() != null) {
            try {
              DataWriter<D> writer = notification.getValue();
              totalRecordsFromEvictedWriters += writer.recordsWritten();
              totalBytesFromEvictedWriters += writer.bytesWritten();
              writer.close();
            } catch (IOException e) {
              log.error("Exception {} encountered when closing data writer on cache eviction", e);
              //Should propagate the exception to avoid committing/publishing corrupt files.
              throw new RuntimeException(e);
            }
          }
        }
      }
    }).build(new CacheLoader<GenericRecord, DataWriter<D>>() {
      @Override
      public DataWriter<D> load(final GenericRecord key)
          throws Exception {
        /* wrap the data writer to allow the option to close the writer on flush */
        return new InstrumentedPartitionedDataWriterDecorator<>(
                new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() {
                  @Override
                  public DataWriter<D> get() {
                    try {
                      log.info(String.format("Adding one more writer to loading cache of existing writer "
                          + "with size = %d", partitionWriters.size()));
                      Future<DataWriter<D>> future = createWriterPool.submit(() -> createPartitionWriter(key));
                      state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1);
                      return future.get(writeTimeoutInterval, TimeUnit.SECONDS);
                    } catch (ExecutionException | InterruptedException e) {
                      throw new RuntimeException("Error creating writer", e);
                    } catch (TimeoutException e) {
                      throw new RuntimeException(String.format("Failed to create writer due to timeout. The operation timed out after %s seconds.", writeTimeoutInterval), e);
                    }
                  }
                }, state), state, key);
      }
    });

    //Schedule a DataWriter cache clean up operation, since LoadingCache may keep the object
    // in memory even after it has been evicted from the cache.
    if (cacheExpiryInterval < Long.MAX_VALUE) {
      this.cacheCleanUpExecutor = Executors.newSingleThreadScheduledExecutor(
          ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("CacheCleanupExecutor")));
      this.cacheCleanUpExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
          PartitionedDataWriter.this.partitionWriters.cleanUp();
        }
      }, 0, cacheExpiryInterval, TimeUnit.SECONDS);
    }

    if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
      Preconditions.checkArgument(builder instanceof PartitionAwareDataWriterBuilder, String
          .format("%s was specified but the writer %s does not support partitioning.",
              ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));

      try {
        this.shouldPartition = true;
        this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
        this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils
            .invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
                builder.getBranches(), builder.getBranch())));
        Preconditions
            .checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()), String
                .format("Writer %s does not support schema from partitioner %s",
                    builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
      } catch (ReflectiveOperationException roe) {
        throw new IOException(roe);
      }
    } else {
      this.shouldPartition = false;
      // Support configuration to close the DataWriter on flush to allow publishing intermediate results in a task
      CloseOnFlushWriterWrapper closeOnFlushWriterWrapper =
          new CloseOnFlushWriterWrapper<D>(new Supplier<DataWriter<D>>() {
            @Override
            public DataWriter<D> get() {
              try {
                return builder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_"
                    + PartitionedDataWriter.this.writerIdSuffix++).build();
              } catch (IOException e) {
                throw new RuntimeException("Error creating writer", e);
              }
            }
          }, state);
      DataWriter<D> dataWriter = (DataWriter)closeOnFlushWriterWrapper.getDecoratedObject();

      InstrumentedDataWriterDecorator<D> writer =
          this.closer.register(new InstrumentedDataWriterDecorator<>(closeOnFlushWriterWrapper, state));

      this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter);
      this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter);
      this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, writer);
      this.partitioner = Optional.absent();
      this.builder = Optional.absent();
    }
  }