public static HoodieWriteConfig createMetadataWriteConfig()

in hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java [86:270]


  public static HoodieWriteConfig createMetadataWriteConfig(
      HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
    String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;

    final long maxLogFileSizeBytes = writeConfig.getMetadataConfig().getMaxLogFileSize();
    // Borrow the cleaner policy from the main table and adjust the cleaner policy based on the main table's cleaner policy
    HoodieCleaningPolicy dataTableCleaningPolicy = writeConfig.getCleanerPolicy();
    HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder()
        .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN)
        .withAutoClean(false)
        .withCleanerParallelism(MDT_DEFAULT_PARALLELISM)
        .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
        .withCleanerPolicy(dataTableCleaningPolicy);

    if (HoodieCleaningPolicy.KEEP_LATEST_COMMITS.equals(dataTableCleaningPolicy)) {
      int retainCommits = (int) Math.max(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, writeConfig.getCleanerCommitsRetained() * 1.2);
      cleanConfigBuilder.retainCommits(retainCommits);
    } else if (HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.equals(dataTableCleaningPolicy)) {
      int retainFileVersions = (int) Math.ceil(writeConfig.getCleanerFileVersionsRetained() * 1.2);
      cleanConfigBuilder.retainFileVersions(retainFileVersions);
    } else if (HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.equals(dataTableCleaningPolicy)) {
      int numHoursRetained = (int) Math.ceil(writeConfig.getCleanerHoursRetained() * 1.2);
      cleanConfigBuilder.cleanerNumHoursRetained(numHoursRetained);
    }

    // Create the write config for the metadata table by borrowing options from the main write config.
    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
        .withEngineType(writeConfig.getEngineType())
        .withWriteTableVersion(writeConfig.getWriteVersion().versionCode())
        .withMergeAllowDuplicateOnInserts(false)
        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
            .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
            .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
            .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
            .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
            .build())
        .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
        .withAutoCommit(true)
        .withAvroSchemaValidate(false)
        .withEmbeddedTimelineServerEnabled(false)
        .withMarkersType(MarkerType.DIRECT.name())
        .withRollbackUsingMarkers(false)
        .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
        .withSchema(HoodieMetadataRecord.getClassSchema().toString())
        .forTable(tableName)
        // we will trigger cleaning manually, to control the instant times
        .withCleanConfig(cleanConfigBuilder.build())
        // we will trigger archive manually, to ensure only regular writer invokes it
        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
            .archiveCommitsWith(
                writeConfig.getMinCommitsToKeep() + 1, writeConfig.getMaxCommitsToKeep() + 1)
            .withAutoArchive(false)
            .build())
        // we will trigger compaction manually, to control the instant times
        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
            .withInlineCompaction(false)
            .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
            .withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
            // Compaction on metadata table is used as a barrier for archiving on main dataset and for validating the
            // deltacommits having corresponding completed commits. Therefore, we need to compact all fileslices of all
            // partitions together requiring UnBoundedCompactionStrategy.
            .withCompactionStrategy(new UnBoundedCompactionStrategy())
            // Check if log compaction is enabled, this is needed for tables with a lot of records.
            .withLogCompactionEnabled(writeConfig.isLogCompactionEnabledOnMetadata())
            // Below config is only used if isLogCompactionEnabled is set.
            .withLogCompactionBlocksThreshold(writeConfig.getMetadataLogCompactBlocksThreshold())
            .build())
        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(MDT_MAX_HFILE_SIZE_BYTES)
            .logFileMaxSize(maxLogFileSizeBytes)
            // Keeping the log blocks as large as the log files themselves reduces the number of HFile blocks to be checked for
            // presence of keys
            .logFileDataBlockMaxSize(maxLogFileSizeBytes).build())
        .withRollbackParallelism(MDT_DEFAULT_PARALLELISM)
        .withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM)
        .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
        .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
        .withWriteStatusClass(FailOnFirstErrorWriteStatus.class)
        .withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled())
        .withRecordMergeMode(RecordMergeMode.CUSTOM)
        .withRecordMergeStrategyId(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)
        .withPayloadConfig(HoodiePayloadConfig.newBuilder()
            .withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build())
        .withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName())
        .withWriteRecordPositionsEnabled(false);

    // RecordKey properties are needed for the metadata table records
    final Properties properties = new Properties();
    properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD_NAME);
    properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD_NAME);
    if (nonEmpty(writeConfig.getMetricReporterMetricsNamePrefix())) {
      properties.put(HoodieMetricsConfig.METRICS_REPORTER_PREFIX.key(),
          writeConfig.getMetricReporterMetricsNamePrefix() + METADATA_TABLE_NAME_SUFFIX);
    }
    builder.withProperties(properties);

    if (writeConfig.isMetricsOn()) {
      // Table Name is needed for metric reporters prefix
      Properties commonProperties = new Properties();
      commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), tableName);

      builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
          .fromProperties(commonProperties)
          .withReporterType(writeConfig.getMetricsReporterType().toString())
          .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
          .withMetricsReporterMetricNamePrefix(writeConfig.getMetricReporterMetricsNamePrefix() + "_" + HoodieTableMetaClient.METADATA_STR)
          .on(true).build());
      switch (writeConfig.getMetricsReporterType()) {
        case GRAPHITE:
          builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
              .onGraphitePort(writeConfig.getGraphiteServerPort())
              .toGraphiteHost(writeConfig.getGraphiteServerHost())
              .usePrefix(writeConfig.getGraphiteMetricPrefix()).build());
          break;
        case JMX:
          builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder()
              .onJmxPort(writeConfig.getJmxPort())
              .toJmxHost(writeConfig.getJmxHost())
              .build());
          break;
        case PROMETHEUS_PUSHGATEWAY:
          HoodieMetricsPrometheusConfig pushGatewayConfig = HoodieMetricsPrometheusConfig.newBuilder()
              .withPushgatewayJobname(writeConfig.getPushGatewayJobName())
              .withPushgatewayRandomJobnameSuffix(writeConfig.getPushGatewayRandomJobNameSuffix())
              .withPushgatewayLabels(writeConfig.getPushGatewayLabels())
              .withPushgatewayReportPeriodInSeconds(String.valueOf(writeConfig.getPushGatewayReportPeriodSeconds()))
              .withPushgatewayHostName(writeConfig.getPushGatewayHost())
              .withPushgatewayPortNum(writeConfig.getPushGatewayPort()).build();
          builder.withProperties(pushGatewayConfig.getProps());
          break;
        case M3:
          HoodieMetricsM3Config m3Config = HoodieMetricsM3Config.newBuilder()
              .onM3Port(writeConfig.getM3ServerPort())
              .toM3Host(writeConfig.getM3ServerHost())
              .useM3Tags(writeConfig.getM3Tags())
              .useM3Service(writeConfig.getM3Service())
              .useM3Env(writeConfig.getM3Env()).build();
          builder.withProperties(m3Config.getProps());
          break;
        case DATADOG:
          HoodieMetricsDatadogConfig.Builder datadogConfig = HoodieMetricsDatadogConfig.newBuilder()
                  .withDatadogApiKey(writeConfig.getDatadogApiKey())
                  .withDatadogApiKeySkipValidation(writeConfig.getDatadogApiKeySkipValidation())
                  .withDatadogPrefix(writeConfig.getDatadogMetricPrefix())
                  .withDatadogReportPeriodSeconds(writeConfig.getDatadogReportPeriodSeconds())
                  .withDatadogTags(String.join(",", writeConfig.getDatadogMetricTags()))
                  .withDatadogApiTimeoutSeconds(writeConfig.getDatadogApiTimeoutSeconds());
          if (writeConfig.getDatadogMetricHost() != null) {
            datadogConfig = datadogConfig.withDatadogHost(writeConfig.getDatadogMetricHost());
          }
          if (writeConfig.getDatadogApiSite() != null) {
            datadogConfig = datadogConfig.withDatadogApiSite(writeConfig.getDatadogApiSite().name());
          }

          builder.withProperties(datadogConfig.build().getProps());
          break;
        case PROMETHEUS:
          HoodieMetricsPrometheusConfig prometheusConfig = HoodieMetricsPrometheusConfig.newBuilder()
              .withPushgatewayLabels(writeConfig.getPushGatewayLabels())
              .withPrometheusPortNum(writeConfig.getPrometheusPort()).build();
          builder.withProperties(prometheusConfig.getProps());
          break;
        case CONSOLE:
        case INMEMORY:
        case CLOUDWATCH:
          break;
        default:
          throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
      }
    }

    HoodieWriteConfig metadataWriteConfig = builder.build();

    // Inline compaction and auto clean is required as we do not expose this table outside
    ValidationUtils.checkArgument(!metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
    ValidationUtils.checkArgument(!metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
    // Auto commit is required
    ValidationUtils.checkArgument(metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
    ValidationUtils.checkArgument(metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
        "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
    // Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
    ValidationUtils.checkArgument(!metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table");

    return metadataWriteConfig;
  }