def ingest_to_feature_store()

in processing/batch_ingest_sm_pyspark.py [0:0]


def ingest_to_feature_store(args: argparse.Namespace, rows) -> None:
    feature_group_name = args.feature_group_name
    session = boto3.session.Session()
    featurestore_runtime_client = session.client(service_name='sagemaker-featurestore-runtime')
    rows = list(rows)
    logger.info(f'Ingesting {len(rows)} rows into feature group: {feature_group_name}')
    for _, row in enumerate(rows):
        record = transform_row(row)
        response = featurestore_runtime_client.put_record(FeatureGroupName=feature_group_name, Record=record)
        assert response['ResponseMetadata']['HTTPStatusCode'] == 200