in client/commands/persistent.py [0:0]
def parse_subscription_response(response: str) -> SubscriptionResponse:
try:
response_json = json.loads(response)
# The response JSON is expected to have the following forms:
# `{"name": "foo", "body": ["TypeErrors", [error_json, ...]]}`
# `{"name": "foo", "body": ["StatusUpdate", ["message_kind", ...]]}`
if isinstance(response_json, dict):
name = response_json.get("name", None)
body = response_json.get("body", None)
if (
name is not None
and body is not None
and isinstance(body, list)
and len(body) > 1
):
tag = body[0]
if tag == "TypeErrors":
return SubscriptionResponse(
name=name, body=_parse_type_error_subscription(body[1])
)
elif tag == "StatusUpdate":
return SubscriptionResponse(
name=name, body=_parse_status_update_subscription(body[1])
)
raise incremental.InvalidServerResponse(
f"Unexpected JSON subscription from server: {response_json}"
)
except json.JSONDecodeError as decode_error:
message = f"Cannot parse subscription as JSON: {decode_error}"
raise incremental.InvalidServerResponse(message) from decode_error