static RecordWriter createRecordWriter()

in hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java [194:469]


  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {

    // Get the path of the temporary output file
    final Path outputDir = ((PathOutputCommitter) committer).getWorkPath();
    final Configuration conf = context.getConfiguration();
    final boolean writeMultipleTables =
      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
    if (writeTableNames == null || writeTableNames.isEmpty()) {
      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
    }
    final FileSystem fs = outputDir.getFileSystem(conf);
    // These configs. are from hbase-*.xml
    final long maxsize =
      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
    // Invented config. Add to hbase-*.xml if other than default compression.
    final String defaultCompressionStr =
      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
    final Algorithm overriddenCompression =
      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
    final boolean compactionExclude =
      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
    final Set<String> allTableNames = Arrays
      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());

    // create a map from column family to the compression algorithm
    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);

    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
      createFamilyDataBlockEncodingMap(conf);
    final DataBlockEncoding overriddenEncoding =
      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;

    return new RecordWriter<ImmutableBytesWritable, V>() {
      // Map of families to writers and how much has been output on the writer.
      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
      private final long now = EnvironmentEdgeManager.currentTime();
      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);

      @Override
      public void write(ImmutableBytesWritable row, V cell) throws IOException {
        // null input == user explicitly wants to flush
        if (row == null && cell == null) {
          rollWriters(null);
          return;
        }

        ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell);
        byte[] rowKey = CellUtil.cloneRow(kv);
        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
        byte[] family = CellUtil.cloneFamily(kv);
        if (writeMultipleTables) {
          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
            .getBytes(Charset.defaultCharset());
          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
            throw new IllegalArgumentException(
              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
          }
        }
        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);

        WriterLength wl = this.writers.get(tableAndFamily);

        // If this is a new column family, verify that the directory exists
        if (wl == null) {
          Path writerPath = null;
          if (writeMultipleTables) {
            Path tableRelPath = getTableRelativePath(tableNameBytes);
            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
          } else {
            writerPath = new Path(outputDir, Bytes.toString(family));
          }
          fs.mkdirs(writerPath);
          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
        }

        // This can only happen once a row is finished though
        if (
          wl != null && wl.written + length >= maxsize
            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
        ) {
          rollWriters(wl);
        }

        // create a new WAL writer, if necessary
        if (wl == null || wl.writer == null) {
          InetSocketAddress[] favoredNodes = null;
          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
            HRegionLocation loc = null;
            String tableName = Bytes.toString(tableNameBytes);
            if (tableName != null) {
              try (
                Connection connection =
                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
                loc = locator.getRegionLocation(rowKey);
              } catch (Throwable e) {
                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
                  tableName, e);
                loc = null;
              }
            }
            if (null == loc) {
              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
            } else {
              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
              InetSocketAddress initialIsa =
                new InetSocketAddress(loc.getHostname(), loc.getPort());
              if (initialIsa.isUnresolved()) {
                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
              } else {
                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
                favoredNodes = new InetSocketAddress[] { initialIsa };
              }
            }
          }
          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);

        }

        // we now have the proper WAL writer. full steam ahead
        PrivateCellUtil.updateLatestStamp(kv, this.now);
        wl.writer.append((ExtendedCell) kv);
        wl.written += length;

        // Copy the row so we know when a row transition.
        this.previousRows.put(family, rowKey);
      }

      private Path getTableRelativePath(byte[] tableNameBytes) {
        String tableName = Bytes.toString(tableNameBytes);
        String[] tableNameParts = tableName.split(":");
        Path tableRelPath = new Path(tableNameParts[0]);
        if (tableNameParts.length > 1) {
          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
        }
        return tableRelPath;
      }

      private void rollWriters(WriterLength writerLength) throws IOException {
        if (writerLength != null) {
          closeWriter(writerLength);
        } else {
          for (WriterLength wl : this.writers.values()) {
            closeWriter(wl);
          }
        }
      }

      private void closeWriter(WriterLength wl) throws IOException {
        if (wl.writer != null) {
          LOG.info(
            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
          close(wl.writer);
          wl.writer = null;
        }
        wl.written = 0;
      }

      private Configuration createRemoteClusterConf(Configuration conf) {
        final Configuration newConf = new Configuration(conf);

        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);

        if (quorum != null && clientPort != null && parent != null) {
          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
        }

        for (Entry<String, String> entry : conf) {
          String key = entry.getKey();
          if (
            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
          ) {
            // Handled them above
            continue;
          }

          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
            if (!originalKey.isEmpty()) {
              newConf.set(originalKey, entry.getValue());
            }
          }
        }

        return newConf;
      }

      /*
       * Create a new StoreFile.Writer.
       * @return A WriterLength, containing a new StoreFile.Writer.
       */
      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
          justification = "Not important")
      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
        InetSocketAddress[] favoredNodes) throws IOException {
        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
        Path familydir = new Path(outputDir, Bytes.toString(family));
        if (writeMultipleTables) {
          familydir =
            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
        }
        WriterLength wl = new WriterLength();
        Algorithm compression = overriddenCompression;
        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
        compression = compression == null ? defaultCompression : compression;
        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
        bloomType = bloomType == null ? BloomType.NONE : bloomType;
        String bloomParam = bloomParamMap.get(tableAndFamily);
        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
        }
        Integer blockSize = blockSizeMap.get(tableAndFamily);
        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
        DataBlockEncoding encoding = overriddenEncoding;
        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
          .withColumnFamily(family).withTableName(tableName)
          .withCreateTime(EnvironmentEdgeManager.currentTime());

        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
          contextBuilder.withIncludesTags(true);
        }

        HFileContext hFileContext = contextBuilder.build();
        if (null == favoredNodes) {
          wl.writer =
            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
              .withBloomType(bloomType).withFileContext(hFileContext).build();
        } else {
          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
            .withFavoredNodes(favoredNodes).build();
        }

        this.writers.put(tableAndFamily, wl);
        return wl;
      }

      private void close(final StoreFileWriter w) throws IOException {
        if (w != null) {
          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
          w.appendTrackedTimestampsToMetadata();
          w.close();
        }
      }

      @Override
      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
        for (WriterLength wl : this.writers.values()) {
          close(wl.writer);
        }
      }
    };
  }