private void open()

in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java [214:302]


  private void open() throws IOException, InterruptedException {
    if ((filePath == null) || (writer == null)) {
      throw new IOException("Invalid file settings");
    }

    final Configuration config = new Configuration();
    // disable FileSystem JVM shutdown hook
    config.setBoolean("fs.automatic.close", false);

    // Hadoop is not thread safe when doing certain RPC operations,
    // including getFileSystem(), when running under Kerberos.
    // open() must be called by one thread at a time in the JVM.
    // NOTE: tried synchronizing on the underlying Kerberos principal previously
    // which caused deadlocks. See FLUME-1231.
    synchronized (staticLock) {
      checkAndThrowInterruptedException();

      try {
        long counter = fileExtensionCounter.incrementAndGet();

        String fullFileName = fileName + "." + counter;

        if (fileSuffix != null && fileSuffix.length() > 0) {
          fullFileName += fileSuffix;
        } else if (codeC != null) {
          fullFileName += codeC.getDefaultExtension();
        }

        bucketPath = filePath + "/" + inUsePrefix
          + fullFileName + inUseSuffix;
        targetPath = filePath + "/" + fullFileName;

        LOG.info("Creating " + bucketPath);
        callWithTimeout(new CallRunner<Void>() {
          @Override
          public Void call() throws Exception {
            if (codeC == null) {
              // Need to get reference to FS using above config before underlying
              // writer does in order to avoid shutdown hook &
              // IllegalStateExceptions
              if (!mockFsInjected) {
                fileSystem = new Path(bucketPath).getFileSystem(config);
              }
              writer.open(bucketPath);
            } else {
              // need to get reference to FS before writer does to
              // avoid shutdown hook
              if (!mockFsInjected) {
                fileSystem = new Path(bucketPath).getFileSystem(config);
              }
              writer.open(bucketPath, codeC, compType);
            }
            return null;
          }
        });
      } catch (Exception ex) {
        sinkCounter.incrementConnectionFailedCount();
        if (ex instanceof IOException) {
          throw (IOException) ex;
        } else {
          throw Throwables.propagate(ex);
        }
      }
    }
    isClosedMethod = getRefIsClosed();
    sinkCounter.incrementConnectionCreatedCount();
    resetCounters();

    // if time-based rolling is enabled, schedule the roll
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch (Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }

    isOpen = true;
  }