in microservices/lti/src/services/lti_token.py [0:0]
def generate_token_claims(lti_request_type, client_id, login_hint,
lti_message_hint, nonce, redirect_uri):
""" Generate claims for the token """
tool = Tool.find_by_client_id(client_id)
tool_info = tool.get_fields(reformat_datetime=True)
if redirect_uri not in tool_info.get("redirect_uris"):
raise Exception(f"Unknown redirect_uri {redirect_uri}")
get_user_url = f"http://user-management/user-management/api/v1/user/{login_hint}"
user_res = get_method(url=get_user_url, use_bot_account=True)
if user_res.status_code == 200:
user = user_res.json().get("data")
else:
raise Exception(
f"Internal error from get user API with status code - {user_res.status_code}"
)
token_claims = {
"iss": LTI_ISSUER_DOMAIN,
"aud": client_id,
"nonce": nonce,
"iat": int(datetime.now().timestamp()),
"exp": int(datetime.now().timestamp()) + TOKEN_TTL,
"sub": login_hint,
"given_name": user.get("first_name"),
"family_name": user.get("last_name"),
"name": user.get("first_name") + " " + user.get("last_name"),
"email": user.get("email"),
"picture": user.get("photo_url"),
lti_claim_field("claim", "tool_platform"): {
"guid": LTI_PLATFORM_UNIQUE_ID,
"name": LTI_PLATFORM_NAME
}
}
context_id = lti_message_hint.get("context_id")
context_type = lti_message_hint.get("context_type")
get_context_url = f"http://classroom-shim/classroom-shim/api/v1/contexts/{context_id}"
context_res = get_method(url=get_context_url, use_bot_account=True)
if context_res.status_code == 200:
context_data = context_res.json().get("data")
else:
Logger.error(
f"Error 1009 response: Status code: {context_res.status_code}; Response: {context_res.text}"
)
raise Exception("Request failed with error code 1009")
if context_type.lower() == "course_template":
lti_context_type = "http://purl.imsglobal.org/vocab/lis/v2/course#CourseTemplate"
else:
lti_context_type = "http://purl.imsglobal.org/vocab/lis/v2/course#CourseSection"
lti_context_id = context_data.get("id")
token_claims[lti_claim_field("claim", "context")] = {
"id": lti_context_id,
"label": context_data.get("description"),
"title": context_data.get("name"),
"type": [lti_context_type]
}
if lti_request_type == "deep_link":
token_claims[lti_claim_field("claim", "deep_linking_settings", "dl")] = {
"accept_types": ["link", "file", "html", "ltiResourceLink", "image"],
"accept_presentation_document_targets": ["iframe", "window", "embed"],
"accept_multiple":
False,
"auto_create":
False,
"title":
"",
"text":
"",
"deep_link_return_url":
f"{LTI_ISSUER_DOMAIN}/lti/api/v1/content-item-return?context_id={lti_context_id}"
}
token_claims[lti_claim_field("claim",
"message_type")] = "LtiDeepLinkingRequest"
if lti_request_type == "resource_link":
token_claims[lti_claim_field("claim",
"message_type")] = "LtiResourceLinkRequest"
lti_content_item_id = lti_message_hint.get("lti_content_item_id")
lti_content_item = LTIContentItem.find_by_id(lti_content_item_id)
# process content item info claims required for launch
content_item_info = lti_content_item.content_item_info
if content_item_info:
if content_item_info.get("url"):
token_claims[lti_claim_field(
"claim", "target_link_uri")] = content_item_info.get("url")
else:
token_claims[lti_claim_field("claim",
"target_link_uri")] = tool_info["tool_url"]
# process custom parameters
final_custom_claims = {**content_item_info.get("custom", {})}
custom_params = lti_message_hint.get("custom_params_for_substitution")
if tool_info.get("custom_params", None) is not None:
cpm = tool_info.get("custom_params")
# process custom parameter from tool registration
# separate params string using ";"
cpm = re.split(";", cpm)
final_cpm = {}
for i in cpm:
# separate params string using "="
if i:
i = i.replace(" ", "")
single_cpm = re.split("=", i)
final_cpm[single_cpm[0]] = single_cpm[1]
for key, value in final_cpm.items():
if custom_params.get(value) is not None:
final_custom_claims[key] = custom_params.get(value)
if "custom" in content_item_info.keys():
# process custom parameters from content_item
for key, value in content_item_info.get("custom").items():
if isinstance(value, str) and value.startswith(
"$") and custom_params.get(value) is not None:
final_custom_claims[key] = custom_params.get(value)
token_claims[lti_claim_field("claim", "custom")] = final_custom_claims
# process grade sync functionality
if tool_info.get("enable_grade_sync"):
token_claims[lti_claim_field("claim", "endpoint", "ags")] = {
"scope": [
"https://purl.imsglobal.org/spec/lti-ags/scope/lineitem",
"https://purl.imsglobal.org/spec/lti-ags/scope/result.readonly",
"https://purl.imsglobal.org/spec/lti-ags/scope/score"
],
"lineitems":
f"{LTI_ISSUER_DOMAIN}/lti/api/v1/{lti_context_id}/line_items",
}
# process line_item claims
if "lineItem" in content_item_info.keys():
line_item = LineItem.find_by_resource_link_id(lti_content_item.id)
token_claims[lti_claim_field("claim", "endpoint", "ags")] = {
"scope": [
lti_claim_field("scope", "lineitem", "ags"),
lti_claim_field("scope", "result.readonly", "ags"),
lti_claim_field("scope", "score", "ags")
],
"lineitems":
f"{LTI_ISSUER_DOMAIN}/lti/api/v1/{lti_context_id}/line_items",
"lineitem":
f"{LTI_ISSUER_DOMAIN}/lti/api/v1/{lti_context_id}/line_items/{line_item.id}"
}
if tool_info.get("enable_nrps"):
token_claims[lti_claim_field("claim", "namesroleservice", "nrps")] = {
"context_memberships_url":
f"{LTI_ISSUER_DOMAIN}/lti/api/v1/{lti_context_id}/memberships",
"service_versions": ["2.0"]
}
if not content_item_info.get("title") or content_item_info.get(
"title") == tool_info.get("name"):
assignment = LTIAssignment.collection.filter("lti_content_item_id", "==",
lti_content_item_id).get()
if assignment:
resource_link_title = assignment.lti_assignment_title
resource_link_description = ""
else:
resource_link_title = content_item_info.get("title")
resource_link_description = content_item_info.get("text")
else:
resource_link_title = content_item_info.get("title")
resource_link_description = content_item_info.get("text")
resource_link_claim_info = {
"id": lti_content_item.id,
"title": resource_link_title,
"description": resource_link_description
}
token_claims[lti_claim_field("claim",
"resource_link")] = resource_link_claim_info
token_claims[lti_claim_field("claim", "version")] = "1.3.0"
token_claims[lti_claim_field("claim",
"deployment_id")] = tool_info["deployment_id"]
if user.get("user_type") == "learner":
token_claims[lti_claim_field("claim", "roles")] = [
"http://purl.imsglobal.org/vocab/lis/v2/membership#Learner",
"http://purl.imsglobal.org/vocab/lis/v2/institution/person#Learner",
"http://purl.imsglobal.org/vocab/lis/v2/institution/person#Student"
]
elif user.get("user_type") == "faculty":
token_claims[lti_claim_field("claim", "roles")] = [
"http://purl.imsglobal.org/vocab/lis/v2/membership#Instructor",
"http://purl.imsglobal.org/vocab/lis/v2/institution/person#Faculty",
"http://purl.imsglobal.org/vocab/lis/v2/institution/person#Instructor"
]
# Role claims for Admin -
elif user.get("user_type") == "admin":
token_claims[lti_claim_field("claim", "roles")] = [
"http://purl.imsglobal.org/vocab/lis/v2/membership#Administrator",
"http://purl.imsglobal.org/vocab/lis/v2/system/person#Administrator",
"http://purl.imsglobal.org/vocab/lis/v2/institution/person#Administrator"
]
return token_claims