in flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java [233:367]
private DataStreamSink<?> consume(
ProviderContext providerContext,
DataStream<RowData> dataStream,
boolean isBounded,
DataStructureConverter converter) {
checkAcidTable(catalogTable.getOptions(), identifier.toObjectPath());
StorageDescriptor sd;
Properties tableProps = new Properties();
Map<String, String> options = catalogTable.getOptions();
boolean isInsertDirectory =
Boolean.parseBoolean(
options.getOrDefault(
CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + IS_INSERT_DIRECTORY,
"false"));
boolean isToLocal = false;
if (isInsertDirectory) {
isToLocal =
Boolean.parseBoolean(
options.getOrDefault(
CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX
+ IS_TO_LOCAL_DIRECTORY,
"false"));
sd =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
identifier.getDatabaseName(), identifier.getObjectName())
.getSd();
HiveConf hiveConf = HiveConfUtils.create(jobConf);
HiveTableUtil.setDefaultStorageFormatForDirectory(sd, HiveConfUtils.create(jobConf));
HiveTableUtil.extractRowFormat(sd, catalogTable.getOptions());
HiveTableUtil.extractStoredAs(sd, catalogTable.getOptions(), hiveConf);
HiveTableUtil.extractLocation(sd, catalogTable.getOptions());
tableProps.putAll(sd.getSerdeInfo().getParameters());
tableProps.putAll(catalogTable.getOptions());
} else {
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
Table table =
client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
sd = table.getSd();
tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
}
}
try {
Class hiveOutputFormatClz =
hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
boolean isCompressed =
jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
HiveWriterFactory writerFactory =
new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
resolvedSchema,
getPartitionKeyArray(),
tableProps,
hiveShim,
isCompressed);
String extension =
Utilities.getFileExtension(
jobConf,
isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID())
.withPartSuffix(extension == null ? "" : extension);
final int sinkParallelism =
Optional.ofNullable(configuredSinkParallelism)
.orElse(dataStream.getParallelism());
boolean sinkParallelismConfigured = configuredSinkParallelism != null;
if (isBounded) {
TableMetaStoreFactory msFactory =
isInsertDirectory
? new EmptyMetaStoreFactory(
new org.apache.flink.core.fs.Path(sd.getLocation()))
: msFactory(autoGatherStatistic);
// default to use the dest location as the parent directory of staging directory
String stagingParentDir = sd.getLocation();
if (isToLocal) {
// it's for writing to local file system, dest location is a path in local file
// system. we need to know the scratch path for
// non-local file system, so that it'll first write to the scratch path and then
// move to the local path
Path rootScratchDirPath =
new Path(
HiveConf.getVar(
HiveConfUtils.create(jobConf),
HiveConf.ConfVars.SCRATCHDIR));
// TODO: may append something more meaningful than a timestamp, like query ID
Path scratchDir =
new Path(
rootScratchDirPath, String.valueOf(System.currentTimeMillis()));
stagingParentDir = scratchDir.toUri().toString();
}
return createBatchSink(
dataStream,
converter,
writerFactory,
msFactory,
fileNamingBuilder,
stagingParentDir,
sd,
tableProps,
isToLocal,
overwrite,
sinkParallelism,
sinkParallelismConfigured);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
return createStreamSink(
providerContext,
dataStream,
sd,
tableProps,
writerFactory,
fileNamingBuilder,
sinkParallelism,
sinkParallelismConfigured);
}
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
} catch (ClassNotFoundException e) {
throw new FlinkHiveException("Failed to get output format class", e);
} catch (IllegalAccessException | InstantiationException e) {
throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}