in services/handlers/fetch_resources/main.py [0:0]
def _validate_task_data(self, task_data):
"""
Validates the incoming task data against required fields and types.
"""
required_fields = {
"scope": str,
"type": str,
"nextPageToken": (str, type(None)),
"is_transferred": bool,
"createdAt": str,
}
missing_fields = [
field for field in required_fields if field not in task_data
]
if missing_fields:
raise ValidationError(
f"Missing required fields: {', '.join(missing_fields)}"
)
for field, expected_types in required_fields.items():
if isinstance(expected_types, tuple):
if not isinstance(task_data[field], expected_types):
expected_type_names = ", ".join(
t.__name__ for t in expected_types
)
raise ValidationError(
f"Invalid type for '{field}', "
f"expected one of: {expected_type_names}"
)
else:
if not isinstance(task_data[field], expected_types):
raise ValidationError(
f"Invalid type for '{field}', "
f"expected {expected_types.__name__}"
)
try:
self._resource_manager_client.get_project_number(self.project)
except HttpError as e:
raise ValidationError(
f"Not enough permissions or {self.project} doesn't exists"
) from e
if task_data["type"] not in ["entry_group", "tag_template"]:
raise ValidationError("Invalid entry type for 'type'")
date_format = "%Y-%m-%d"
try:
datetime.strptime(task_data["createdAt"], date_format)
except ValueError as exc:
raise ValidationError(
"Invalid date format for 'createdAt'. "
f"Expected format: {date_format}"
) from exc
if task_data["type"] == "tag_template":
if "is_public" not in task_data:
raise ValidationError(
"Missing required field: 'is_public' for tag_template"
)
if not isinstance(task_data["is_public"], bool):
raise ValidationError(
"Invalid type for 'is_public', expected bool"
)