in source/aws_lambda/shared/personalize_service.py [0:0]
def _validate_schema(self, name: str, schema: Optional[Dict]) -> None:
if not schema:
return # nothing to validate - schema wasn't provided
avro_schema = schema.get("schema", {})
avro_schema_name = schema.get("name")
# check for schema name
if not avro_schema_name:
self._configuration_errors.append(f"The {name} schema name is missing")
# check for schema
if not avro_schema:
self._configuration_errors.append(f"The {name} schema is missing")
else:
try:
avro.schema.parse(json.dumps(avro_schema))
except avro.schema.SchemaParseException as exc:
self._configuration_errors.append(
f"The {name} schema is not valid: {exc}"
)
self._validate_resource(
Schema(),
{
"schema": json.dumps(avro_schema),
"name": avro_schema_name,
},
)