in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [205:248]
private def putOnlineRecordsForPartition(
partition: Iterator[Row],
featureGroupName: String,
columns: Array[String],
targetStores: List[TargetStore],
runTimeClient: SageMakerFeatureStoreRuntimeClient
): Iterator[Row] = {
val newPartition = partition.map(row => {
val record = ListBuffer[FeatureValue]()
columns.foreach(columnName => {
try {
if (!row.isNullAt(row.fieldIndex(columnName))) {
val featureValue = row.getAs[Any](columnName)
record += FeatureValue
.builder()
.featureName(columnName)
.valueAsString(featureValue.toString)
.build()
}
} catch {
case e: Throwable => throw new RuntimeException(e)
}
})
val errorMessage = Try {
val putRecordRequestBuilder = PutRecordRequest
.builder()
.featureGroupName(featureGroupName)
.record(record.asJava)
if (targetStores != null) {
putRecordRequestBuilder.targetStores(targetStores.asJava)
}
runTimeClient.putRecord(putRecordRequestBuilder.build())
} match {
case Success(value) => null
case Failure(ex) => ex.getMessage
}
Row.fromSeq(row.toSeq.toList :+ errorMessage)
})
newPartition
}