private def putOnlineRecordsForPartition()

in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [205:248]


  private def putOnlineRecordsForPartition(
      partition: Iterator[Row],
      featureGroupName: String,
      columns: Array[String],
      targetStores: List[TargetStore],
      runTimeClient: SageMakerFeatureStoreRuntimeClient
  ): Iterator[Row] = {
    val newPartition = partition.map(row => {
      val record = ListBuffer[FeatureValue]()
      columns.foreach(columnName => {
        try {
          if (!row.isNullAt(row.fieldIndex(columnName))) {
            val featureValue = row.getAs[Any](columnName)
            record += FeatureValue
              .builder()
              .featureName(columnName)
              .valueAsString(featureValue.toString)
              .build()
          }
        } catch {
          case e: Throwable => throw new RuntimeException(e)
        }
      })

      val errorMessage = Try {
        val putRecordRequestBuilder = PutRecordRequest
          .builder()
          .featureGroupName(featureGroupName)
          .record(record.asJava)

        if (targetStores != null) {
          putRecordRequestBuilder.targetStores(targetStores.asJava)
        }
        runTimeClient.putRecord(putRecordRequestBuilder.build())
      } match {
        case Success(value) => null
        case Failure(ex)    => ex.getMessage
      }

      Row.fromSeq(row.toSeq.toList :+ errorMessage)
    })

    newPartition
  }