def ingestData()

in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [83:113]


  def ingestData(inputDataFrame: DataFrame, featureGroupArn: String, targetStores: List[String] = null): Unit = {

    val featureGroupArnResolver = new FeatureGroupArnResolver(featureGroupArn)
    val featureGroupName        = featureGroupArn
    val region                  = featureGroupArnResolver.resolveRegion()

    ClientFactory.initialize(region = region, roleArn = assumeRoleArn)

    val describeResponse = getFeatureGroup(featureGroupName)

    checkIfFeatureGroupIsCreated(describeResponse)
    val parsedTargetStores = checkAndParseTargetStore(describeResponse, targetStores)

    val eventTimeFeatureName = describeResponse.eventTimeFeatureName()
    val recordIdentifierName = describeResponse.recordIdentifierFeatureName()

    if (parsedTargetStores == null || shouldIngestInStream(parsedTargetStores)) {
      validateSchemaNames(inputDataFrame.schema.names, describeResponse, recordIdentifierName, eventTimeFeatureName)
      streamIngestIntoOnlineStore(featureGroupName, inputDataFrame, parsedTargetStores, region)
    } else {

      val validatedInputDataFrame = validateInputDataFrame(inputDataFrame, describeResponse)

      batchIngestIntoOfflineStore(
        validatedInputDataFrame,
        describeResponse,
        eventTimeFeatureName,
        region
      )
    }
  }