in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [83:113]
def ingestData(inputDataFrame: DataFrame, featureGroupArn: String, targetStores: List[String] = null): Unit = {
val featureGroupArnResolver = new FeatureGroupArnResolver(featureGroupArn)
val featureGroupName = featureGroupArn
val region = featureGroupArnResolver.resolveRegion()
ClientFactory.initialize(region = region, roleArn = assumeRoleArn)
val describeResponse = getFeatureGroup(featureGroupName)
checkIfFeatureGroupIsCreated(describeResponse)
val parsedTargetStores = checkAndParseTargetStore(describeResponse, targetStores)
val eventTimeFeatureName = describeResponse.eventTimeFeatureName()
val recordIdentifierName = describeResponse.recordIdentifierFeatureName()
if (parsedTargetStores == null || shouldIngestInStream(parsedTargetStores)) {
validateSchemaNames(inputDataFrame.schema.names, describeResponse, recordIdentifierName, eventTimeFeatureName)
streamIngestIntoOnlineStore(featureGroupName, inputDataFrame, parsedTargetStores, region)
} else {
val validatedInputDataFrame = validateInputDataFrame(inputDataFrame, describeResponse)
batchIngestIntoOfflineStore(
validatedInputDataFrame,
describeResponse,
eventTimeFeatureName,
region
)
}
}