public void open()

in flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java [76:117]


  public void open(String filePath, CompressionCodec codec,
      CompressionType cType) throws IOException {
    Configuration conf = new Configuration();
    Path dstPath = new Path(filePath);
    FileSystem hdfs = dstPath.getFileSystem(conf);
    if (useRawLocalFileSystem) {
      if (hdfs instanceof LocalFileSystem) {
        hdfs = ((LocalFileSystem)hdfs).getRaw();
      } else {
        logger.warn("useRawLocalFileSystem is set to true but file system " +
            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
      }
    }
    boolean appending = false;
    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
      fsOut = hdfs.append(dstPath);
      appending = true;
    } else {
      fsOut = hdfs.create(dstPath);
    }
    if (compressor == null) {
      compressor = CodecPool.getCompressor(codec, conf);
    }
    cmpOut = codec.createOutputStream(fsOut, compressor);
    serializer = EventSerializerFactory.getInstance(serializerType,
        serializerContext, cmpOut);
    if (appending && !serializer.supportsReopen()) {
      cmpOut.close();
      serializer = null;
      throw new IOException("serializer (" + serializerType
          + ") does not support append");
    }

    registerCurrentStream(fsOut, hdfs, dstPath);

    if (appending) {
      serializer.afterReopen();
    } else {
      serializer.afterCreate();
    }
    isFinished = false;
  }