From 2117cd41b93e4753567bcf59a58375562d5b0fb6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 31 May 2026 14:41:00 +0000 Subject: [PATCH 1/2] Implement LTI 1.3 support --- README.md | 20 +- controllers/auth.py | 56 ++++- controllers/helpers.py | 3 + controllers/pylti/common.py | 8 +- controllers/pylti/flask.py | 115 +++++++--- controllers/pylti/lti13.py | 390 ++++++++++++++++++++++++++++++++ controllers/pylti/post_grade.py | 4 +- requirements.txt | 1 + tests/controllers/test_lti13.py | 165 ++++++++++++++ 9 files changed, 718 insertions(+), 44 deletions(-) create mode 100644 controllers/pylti/lti13.py create mode 100644 tests/controllers/test_lti13.py diff --git a/README.md b/README.md index 53db9b860..5898f98ae 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,25 @@ just want to use the general CORGIS Github instance. No further work is required Your `CONSUMER_KEY` uniquely identifies you to Canvas, `CONSUMER_SECRET` is to be shared with instructors using your BlockPy instance. The goal is to keep it relatively secretive. You can choose anything you want for your Key and Secret. +For LTI 1.3 launches and LTI Advantage AGS grade passback, configure one or more platforms with `LTI13_PLATFORMS` in +your instance configuration. Each entry should provide an `issuer`, `client_id`, `auth_login_url`, `auth_token_url`, +`jwks_url`, and either `client_secret` or a `private_key` / `private_key_file` (plus optional `kid`). You can also set +`deployment_ids` and `service` when you need to scope a platform more narrowly or keep Canvas-specific behavior. + +```python +LTI13_PLATFORMS = [{ + "issuer": "https://canvas.instructure.com", + "client_id": "YOUR_DEVELOPER_KEY_ID", + "deployment_ids": ["YOUR_DEPLOYMENT_ID"], + "auth_login_url": "https://canvas.instructure.com/api/lti/authorize_redirect", + "auth_token_url": "https://canvas.instructure.com/login/oauth2/token", + "jwks_url": "https://canvas.instructure.com/api/lti/security/jwks", + "private_key_file": "/full/path/lti13_private_key.pem", + "kid": "OPTIONAL_JWK_KEY_ID", + "service": "canvas" +}] +``` + ## Database setup You're going to need to create a new Postgres database and prepopulate some schemas. Our database is named `blockpydb`. @@ -719,4 +738,3 @@ CC106 --> SSC @enduml ``` - diff --git a/controllers/auth.py b/controllers/auth.py index 365a05365..3199774ca 100644 --- a/controllers/auth.py +++ b/controllers/auth.py @@ -14,7 +14,7 @@ import json from functools import wraps -from flask import current_app, g, jsonify, make_response, request, abort +from flask import current_app, g, jsonify, make_response, request, abort, redirect from werkzeug.datastructures import ImmutableMultiDict from werkzeug.wrappers import Request from flask_jwt_extended import create_access_token, get_jwt_identity, verify_jwt_in_request, \ @@ -28,6 +28,12 @@ from flask_security.core import current_user import flask_security from controllers.pylti.flask import LTI_SESSION_KEY, LTI, LTIException +from controllers.pylti.lti13 import ( + build_login_redirect_url, + find_platform, + is_lti13_launch, + is_lti13_login_initiation, +) from flask_jwt_extended import create_access_token from controllers.services import ValidUserPermissionLayer, InvalidUserPermissionLayer @@ -130,6 +136,8 @@ def login_user_if_able(): # During the login process, we will let the user be anonymous return make_user_anonymous(request.remote_addr) # If LTI parameters are available, let's try setting that up + if is_lti13_login_request(request): + return try_lti13_login_initiation() if is_lti_launch_request(request): return try_lti_login_initial() # If a stored LTI session was provided, we can try that instead @@ -164,6 +172,12 @@ def get_consumer_secrets(app=None): } +def get_lti13_platforms(app=None): + if app is None: + app = current_app + return app.config.get('LTI13_PLATFORMS', []) + + def load_logged_in_user(): """ If a current_user is available, then logs them in as the current user. @@ -173,7 +187,7 @@ def load_logged_in_user(): g.user = current_user g.safely = ValidUserPermissionLayer(g.user) if session.get(LTI_SESSION_KEY, False): - g.lti = LTI(get_consumer_secrets()) + g.lti = LTI(get_consumer_secrets(), lti13_platforms=get_lti13_platforms()) if 'lti_course_id' in session and g.user: g.course = Course.by_id(session['lti_course_id']) g.roles = g.user.get_course_roles(g.course.id) @@ -192,7 +206,7 @@ def get_user() -> (User, int): def try_lti_login_initial(): - g.lti = LTI(get_consumer_secrets()) + g.lti = LTI(get_consumer_secrets(), lti13_platforms=get_lti13_platforms()) g.lti.verify_request() # TODO: Provide any other LTI information that we need load_lti_user() @@ -208,17 +222,18 @@ def load_lti_user(): Those fields will not be updated if they are found in the session. :return: """ + lti_service = session.get('lti_service', "canvas") # 1) check whether the user needs to be updated old_user = g.user if 'user' in g else None - g.user = User.from_lti("canvas", + g.user = User.from_lti(lti_service, session["pylti_user_id"], session.get("lis_person_contact_email_primary", ""), session.get("lis_person_name_given", "Canvas"), session.get("lis_person_name_family", "User")) g.safely = ValidUserPermissionLayer(g.user) # 2) Check the course - new_outcome_url = request.form.get('lis_outcome_service_url', "") - g.course = Course.from_lti("canvas", + new_outcome_url = request.form.get('lis_outcome_service_url', session.get('lis_outcome_service_url', "")) + g.course = Course.from_lti(lti_service, session["context_id"], session.get("context_title", ""), g.user.id, @@ -232,7 +247,9 @@ def load_lti_user(): # 4) Generally update the LTI status session['is_lti_active'] = True # Keep track of the chosen oauth_consumer_key - g.oauth_consumer_key = request.form.get('oauth_consumer_key', "") + g.oauth_consumer_key = request.form.get('oauth_consumer_key', + session.get('oauth_consumer_key', + session.get('lti_client_id', ""))) # 5) If the user changed, then log them in again handle_login_change(old_user) @@ -284,7 +301,27 @@ def is_lti_launch_request(request) -> bool: Determines if the request is an LTI launch request. Does NOT check that the request is a *valid* LTI launch request, just that it has the potential to be one. """ - return request.method == 'POST' and request.form.get('lti_message_type') == 'basic-lti-launch-request' + return request.method == 'POST' and ( + request.form.get('lti_message_type') == 'basic-lti-launch-request' + or is_lti13_launch(request.form) + ) + + +def is_lti13_login_request(request) -> bool: + return request.method in ('GET', 'POST') and is_lti13_login_initiation(request.values) + + +def try_lti13_login_initiation(): + platform = find_platform(get_lti13_platforms(), request.values.get('iss'), + request.values.get('client_id'), + request.values.get('lti_deployment_id')) + if platform is None: + raise LTIException("Unknown LTI 1.3 login initiation platform") + session['lti13_state'] = uuid.uuid4().hex + session['lti13_nonce'] = uuid.uuid4().hex + return redirect(build_login_redirect_url(platform, request.values, + session['lti13_state'], + session['lti13_nonce'])) def is_stored_lti_launch_request(request) -> bool: @@ -318,7 +355,8 @@ def try_lti_login_stored(): try: verify_jwt_in_request() user_id = get_jwt_identity() - g.lti = LTI(get_consumer_secrets(), use_request=get_jwt()) + g.lti = LTI(get_consumer_secrets(), use_request=get_jwt(), + lti13_platforms=get_lti13_platforms()) load_jwt_user(user_id, get_jwt()) g.access_token = create_user_token() return True diff --git a/controllers/helpers.py b/controllers/helpers.py index 0c05de90b..d4a93ff77 100644 --- a/controllers/helpers.py +++ b/controllers/helpers.py @@ -193,6 +193,9 @@ def parse_assignment_load(assignment_id_or_url=None): course_id = int(g.course.id) if 'course' in g and g.course else None # LTI submission URL new_submission_url = request.form.get('lis_result_sourcedid', None) + if (new_submission_url is None and request.method == 'POST' + and request.form.get('id_token') and session.get('lti_version') == 'LTI-1p3'): + new_submission_url = session.get('lis_result_sourcedid', None) new_due_date = from_canvas_isotime(request.form.get('custom_canvas_assignment_dueat', None)) new_lock_date = from_canvas_isotime(request.form.get('custom_canvas_assignment_lockat', None)) # Embedded? diff --git a/controllers/pylti/common.py b/controllers/pylti/common.py index e770b9887..a3f3c0c98 100644 --- a/controllers/pylti/common.py +++ b/controllers/pylti/common.py @@ -212,10 +212,9 @@ def my_normalize(self, headers): http._normalize_headers = monkey_patch_function log.debug("key %s", lti_key) - log.debug("secret %s", secret) log.debug("url %s", url) log.debug("response %s", response) - log.debug("content %s", format(content)) + log.debug("content length %s", len(content) if content is not None else 0) return response, content @@ -285,11 +284,10 @@ def verify_request_common(consumers, url, method, headers, params): :return: is request valid """ - log.debug("consumers %s", consumers) log.debug("url %s", url) log.debug("method %s", method) - log.debug("headers %s", headers) - log.debug("params %s", params) + log.debug("header keys %s", list(headers.keys()) if hasattr(headers, 'keys') else []) + log.debug("param keys %s", list(params.keys()) if hasattr(params, 'keys') else []) oauth_server = LTIOAuthServer(consumers) diff --git a/controllers/pylti/flask.py b/controllers/pylti/flask.py index 886233fe3..7d5bfe738 100644 --- a/controllers/pylti/flask.py +++ b/controllers/pylti/flask.py @@ -8,6 +8,7 @@ from flask import session as flask_session, current_app, Flask, g from flask import request as flask_request +from models.enums import clean_role from .common import ( LTI_SESSION_KEY, @@ -22,9 +23,16 @@ LTINotInSessionException, LTIPostMessageException, parse_read_result ) +from .lti13 import ( + parse_lti13_endpoint, + post_ags_score, + read_ags_result, + verify_lti13_launch, +) log = logging.getLogger('pylti.flask') # pylint: disable=invalid-name +LTI_SESSION_PROPERTY_LIST_KEY = 'lti_session_property_list' class LTI: @@ -36,8 +44,10 @@ class LTI: created requests (taken from an access token). """ - def __init__(self, consumer_data, use_session=False, use_request=False): + def __init__(self, consumer_data=None, use_session=False, use_request=False, + lti13_platforms=None): self.consumer_data = consumer_data + self.lti13_platforms = lti13_platforms or [] self.use_session = bool(use_session) self._internal_session = use_session @@ -105,6 +115,41 @@ def _consumers(self): """ return self.consumer_data + def _platforms(self): + return self.lti13_platforms + + def _stored_lti_properties(self): + return self.session.get(LTI_SESSION_PROPERTY_LIST_KEY, []) + + def _store_lti_properties(self, values): + stored_keys = [] + for key, value in values.items(): + if value is None: + continue + self.session[key] = value + stored_keys.append(key) + self.session[LTI_SESSION_PROPERTY_LIST_KEY] = stored_keys + + def _clear_lti_properties(self): + for prop in self._stored_lti_properties(): + if prop in self.session: + del self.session[prop] + for prop in LTI_PROPERTY_LIST: + if prop in self.session: + del self.session[prop] + if self.session.get('pylti_user_id', None): + del self.session['pylti_user_id'] + self.session.pop(LTI_SESSION_PROPERTY_LIST_KEY, None) + + def _is_lti13_endpoint(self, endpoint=None): + if parse_lti13_endpoint(endpoint): + return True + if endpoint is None and parse_lti13_endpoint(self.session.get('lis_outcome_service_url')): + return True + if endpoint is None and parse_lti13_endpoint(self.session.get('lis_result_sourcedid')): + return True + return False + @property def key(self): # pylint: disable=no-self-use """ @@ -151,9 +196,14 @@ def is_role(self, role): log.debug("is_role %s", role) roles = self.session['roles'].split(',') if role in LTI_ROLES: - role_list = LTI_ROLES[role] + role_list = {value.lower() for value in LTI_ROLES[role]} + cleaned_roles = set() + for raw_role in roles: + cleaned_role, _ = clean_role(raw_role.strip()) + cleaned_roles.add(cleaned_role.lower()) + cleaned_roles.add(raw_role.strip().lower()) # find the intersection of the roles - roles = set(role_list) & set(roles) + roles = role_list & cleaned_roles is_user_role_there = len(roles) >= 1 log.debug( "is_role roles_list=%s role=%s in list=%s", role_list, @@ -199,32 +249,33 @@ def verify_request(self): log.debug('verify_request?') #print(self._consumers(), url, method, headers, params) try: - verify_request_common(self._consumers(), url, method, headers, params) + if params.get('id_token', None): + launch_data = verify_lti13_launch(self._platforms(), params, self.session) + else: + verify_request_common(self._consumers(), url, method, headers, params) + launch_data = {prop: params[prop] for prop in LTI_PROPERTY_LIST + if params.get(prop, None) is not None} + if params.get('user_id', None): + launch_data['pylti_user_id'] = params['user_id'] log.debug('verify_request success') # All good to go, store all of the LTI params into a # session dict for use in views - for prop in LTI_PROPERTY_LIST: - if params.get(prop, None) is not None: - log.debug("params %s=%s", prop, params.get(prop, None)) - self.session[prop] = params[prop] - if params.get('user_id', None): - self.session['pylti_user_id'] = params['user_id'] - + self._store_lti_properties(launch_data) # Set logged in session key self.session[LTI_SESSION_KEY] = True return True except LTIException as e: - for prop in LTI_PROPERTY_LIST: - if self.session.get(prop, None): - del self.session[prop] - if self.session.get('pylti_user_id', None): - del self.session['pylti_user_id'] - + self._clear_lti_properties() self.session[LTI_SESSION_KEY] = False raise def get_grade(self, endpoint=None): + if self._is_lti13_endpoint(endpoint): + course_endpoint = parse_lti13_endpoint(self.response_url) + submission_endpoint = parse_lti13_endpoint(endpoint or self.lis_result_sourcedid) + if course_endpoint and submission_endpoint: + return read_ags_result(self._platforms(), course_endpoint, submission_endpoint) message_identifier_id = self.message_identifier_id() operation = 'readResult' if endpoint is None: @@ -248,14 +299,24 @@ def post_grade(self, grade, message='', endpoint=None, url=False, :return: True if post successful and grade valid :exception: LTIPostMessageException if call failed """ + score = float(grade) if grade is not None else None + if self._is_lti13_endpoint(endpoint): + course_endpoint = parse_lti13_endpoint(self.response_url) + submission_endpoint = parse_lti13_endpoint(endpoint or self.lis_result_sourcedid) + if score is None or not (0 <= score <= 1.0): + return False + if not course_endpoint or not submission_endpoint: + raise LTIPostMessageException("Invalid LTI 1.3 AGS configuration") + post_ags_score(self._platforms(), course_endpoint, submission_endpoint, score, + comment=message, needs_review=needs_review, + when_submitted_at=when_submitted_at) + return True message_identifier_id = self.message_identifier_id() operation = 'replaceResult' if endpoint is None: lis_result_sourcedid = self.lis_result_sourcedid else: lis_result_sourcedid = endpoint - # # edX devbox fix - score = float(grade) if grade is not None else None if score is None or 0 <= score <= 1.0: xml = generate_request_xml( message_identifier_id, operation, lis_result_sourcedid, @@ -308,18 +369,18 @@ def close_session(self): """ Invalidates session """ - for prop in LTI_PROPERTY_LIST: - if self.session.get(prop, None): - del self.session[prop] - if self.session.get('pylti_user_id', None): - del self.session['pylti_user_id'] + self._clear_lti_properties() self.session[LTI_SESSION_KEY] = False def to_json(self): frozen_session = {} - for prop in LTI_PROPERTY_LIST: + for prop in self._stored_lti_properties(): if self.session.get(prop, None) is not None: frozen_session[prop] = self.session[prop] - if self.session.get('pylti_user_id', None): - frozen_session['user_id'] = self.session['pylti_user_id'] + if not frozen_session: + for prop in LTI_PROPERTY_LIST: + if self.session.get(prop, None) is not None: + frozen_session[prop] = self.session[prop] + if self.session.get('pylti_user_id', None): + frozen_session['user_id'] = self.session['pylti_user_id'] return frozen_session diff --git a/controllers/pylti/lti13.py b/controllers/pylti/lti13.py new file mode 100644 index 000000000..62b4af3c5 --- /dev/null +++ b/controllers/pylti/lti13.py @@ -0,0 +1,390 @@ +import base64 +import json +import logging +import os +import time +import urllib.parse +import urllib.request +import uuid + +import jwt + +from .common import LTIException, LTIPostMessageException + +log = logging.getLogger('pylti.lti13') # pylint: disable=invalid-name + +CLAIM_MESSAGE_TYPE = 'https://purl.imsglobal.org/spec/lti/claim/message_type' +CLAIM_VERSION = 'https://purl.imsglobal.org/spec/lti/claim/version' +CLAIM_DEPLOYMENT_ID = 'https://purl.imsglobal.org/spec/lti/claim/deployment_id' +CLAIM_TARGET_LINK_URI = 'https://purl.imsglobal.org/spec/lti/claim/target_link_uri' +CLAIM_RESOURCE_LINK = 'https://purl.imsglobal.org/spec/lti/claim/resource_link' +CLAIM_CONTEXT = 'https://purl.imsglobal.org/spec/lti/claim/context' +CLAIM_ROLES = 'https://purl.imsglobal.org/spec/lti/claim/roles' +CLAIM_LAUNCH_PRESENTATION = 'https://purl.imsglobal.org/spec/lti/claim/launch_presentation' +CLAIM_CUSTOM = 'https://purl.imsglobal.org/spec/lti/claim/custom' +CLAIM_TOOL_PLATFORM = 'https://purl.imsglobal.org/spec/lti/claim/tool_platform' +CLAIM_AGS_ENDPOINT = 'https://purl.imsglobal.org/spec/lti-ags/claim/endpoint' + +LTI13_VERSION = 'LTI-1p3' +RESULT_READ_SCOPE = 'https://purl.imsglobal.org/spec/lti-ags/scope/result.readonly' +SCORE_SCOPE = 'https://purl.imsglobal.org/spec/lti-ags/scope/score' +LINEITEM_SCOPE = 'https://purl.imsglobal.org/spec/lti-ags/scope/lineitem' + + +def _as_list(value): + if value is None: + return [] + if isinstance(value, list): + return value + if isinstance(value, tuple): + return list(value) + return [value] + + +def normalize_platforms(platforms): + if not platforms: + return [] + if isinstance(platforms, dict): + if 'issuer' in platforms: + return [platforms] + return [dict(config, platform_key=key) for key, config in platforms.items()] + return list(platforms) + + +def find_platform(platforms, issuer, client_id=None, deployment_id=None): + for platform in normalize_platforms(platforms): + if platform.get('issuer') != issuer: + continue + configured_client_id = str(platform.get('client_id', '')) + if client_id is not None and configured_client_id and configured_client_id != str(client_id): + continue + deployment_ids = _as_list(platform.get('deployment_ids') or platform.get('deployment_id')) + if deployment_id is not None and deployment_ids and deployment_id not in deployment_ids: + continue + return platform + return None + + +def is_lti13_launch(params): + return bool(params.get('id_token')) + + +def is_lti13_login_initiation(params): + return (not params.get('id_token') + and bool(params.get('iss')) + and bool(params.get('login_hint')) + and bool(params.get('target_link_uri'))) + + +def build_login_redirect_url(platform, params, state, nonce): + auth_login_url = platform.get('auth_login_url') or platform.get('authorization_endpoint') + if not auth_login_url: + raise LTIException('Missing LTI 1.3 authorization endpoint configuration.') + client_id = platform.get('client_id') + if not client_id: + raise LTIException('Missing LTI 1.3 client_id configuration.') + query = { + 'scope': 'openid', + 'response_type': 'id_token', + 'response_mode': 'form_post', + 'prompt': 'none', + 'client_id': client_id, + 'redirect_uri': params['target_link_uri'], + 'login_hint': params['login_hint'], + 'state': state, + 'nonce': nonce, + } + if params.get('lti_message_hint'): + query['lti_message_hint'] = params['lti_message_hint'] + if params.get('lti_deployment_id'): + query['lti_deployment_id'] = params['lti_deployment_id'] + return auth_login_url + ('&' if '?' in auth_login_url else '?') + urllib.parse.urlencode(query) + + +def fetch_json(url, headers=None): + request = urllib.request.Request(url, headers=headers or {}) + with urllib.request.urlopen(request) as response: + return json.loads(response.read().decode('utf-8')) + + +def request_json(url, method='GET', data=None, headers=None): + payload = None + if data is not None: + if isinstance(data, (bytes, bytearray)): + payload = data + else: + payload = json.dumps(data).encode('utf-8') + request_headers = dict(headers or {}) + if payload is not None and 'Content-Type' not in request_headers: + request_headers['Content-Type'] = 'application/json' + request = urllib.request.Request(url, data=payload, headers=request_headers, method=method) + with urllib.request.urlopen(request) as response: + body = response.read().decode('utf-8') + if not body: + return response.status, None + return response.status, json.loads(body) + + +def post_form(url, data, headers=None): + payload = urllib.parse.urlencode(data).encode('utf-8') + request_headers = {'Content-Type': 'application/x-www-form-urlencoded'} + request_headers.update(headers or {}) + request = urllib.request.Request(url, data=payload, headers=request_headers, method='POST') + with urllib.request.urlopen(request) as response: + body = response.read().decode('utf-8') + return response.status, json.loads(body) + + +def _load_public_key(jwks_url, kid=None): + if not jwks_url: + raise LTIException('Missing LTI 1.3 JWKS configuration.') + keyset = fetch_json(jwks_url) + keys = keyset.get('keys', []) + if kid is not None: + keys = [key for key in keys if key.get('kid') == kid] + if len(keys) != 1: + raise LTIException('Unable to identify LTI 1.3 signing key.') + return jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(keys[0])) + + +def _decode_unverified_claims(token): + return jwt.decode(token, options={"verify_signature": False, "verify_aud": False}) + + +def parse_lti13_endpoint(endpoint): + if not endpoint or not isinstance(endpoint, str): + return None + endpoint = endpoint.strip() + if not endpoint.startswith('{'): + return None + try: + data = json.loads(endpoint) + except json.JSONDecodeError: + return None + if data.get('lti_version') != LTI13_VERSION: + return None + return data + + +def serialize_course_endpoint(platform, claims): + ags = claims.get(CLAIM_AGS_ENDPOINT, {}) + return json.dumps({ + 'lti_version': LTI13_VERSION, + 'issuer': claims.get('iss', ''), + 'client_id': str(platform.get('client_id', '')), + 'deployment_id': claims.get(CLAIM_DEPLOYMENT_ID, ''), + 'auth_token_url': platform.get('auth_token_url') or platform.get('token_url') or '', + 'lineitems': ags.get('lineitems', ''), + 'scope': ags.get('scope', []) or [], + }, sort_keys=True) + + +def serialize_submission_endpoint(claims): + ags = claims.get(CLAIM_AGS_ENDPOINT, {}) + resource_link = claims.get(CLAIM_RESOURCE_LINK, {}) + return json.dumps({ + 'lti_version': LTI13_VERSION, + 'lineitem': ags.get('lineitem', ''), + 'lineitems': ags.get('lineitems', ''), + 'resource_link_id': resource_link.get('id', ''), + 'user_id': claims.get('sub', ''), + }, sort_keys=True) + + +def normalize_launch_claims(claims, platform): + context = claims.get(CLAIM_CONTEXT, {}) + resource_link = claims.get(CLAIM_RESOURCE_LINK, {}) + launch_presentation = claims.get(CLAIM_LAUNCH_PRESENTATION, {}) + tool_platform = claims.get(CLAIM_TOOL_PLATFORM, {}) + roles = claims.get(CLAIM_ROLES, []) or [] + custom = claims.get(CLAIM_CUSTOM, {}) or {} + user_id = claims.get('sub', '') + launch_data = { + 'oauth_consumer_key': str(platform.get('client_id', '')), + 'user_id': user_id, + 'pylti_user_id': user_id, + 'roles': ','.join(roles), + 'lti_version': claims.get(CLAIM_VERSION, LTI13_VERSION), + 'lti_message_type': claims.get(CLAIM_MESSAGE_TYPE, ''), + 'launch_presentation_return_url': launch_presentation.get('return_url', ''), + 'launch_presentation_document_target': launch_presentation.get('document_target', ''), + 'launch_presentation_width': str(launch_presentation.get('width', '')), + 'launch_presentation_height': str(launch_presentation.get('height', '')), + 'context_id': context.get('id', ''), + 'context_label': context.get('label', ''), + 'context_title': context.get('title', ''), + 'resource_link_id': resource_link.get('id', ''), + 'resource_link_title': resource_link.get('title', ''), + 'lis_person_contact_email_primary': claims.get('email', ''), + 'lis_person_contact_emailprimary': claims.get('email', ''), + 'lis_person_name_full': claims.get('name', ''), + 'lis_person_name_family': claims.get('family_name', ''), + 'lis_person_name_given': claims.get('given_name', ''), + 'lis_person_sourcedid': user_id, + 'lti_issuer': claims.get('iss', ''), + 'lti_client_id': str(platform.get('client_id', '')), + 'lti_deployment_id': claims.get(CLAIM_DEPLOYMENT_ID, ''), + 'lti_target_link_uri': claims.get(CLAIM_TARGET_LINK_URI, ''), + 'lti_tool_platform_guid': tool_platform.get('guid', ''), + 'lti_launch_claims': json.dumps(claims, sort_keys=True), + 'lti_service': platform.get('service', 'canvas'), + } + if claims.get(CLAIM_AGS_ENDPOINT): + launch_data['lis_outcome_service_url'] = serialize_course_endpoint(platform, claims) + launch_data['lis_result_sourcedid'] = serialize_submission_endpoint(claims) + for key, value in custom.items(): + custom_key = key if key.startswith('custom_') else f'custom_{key}' + if isinstance(value, (dict, list)): + value = json.dumps(value, sort_keys=True) + launch_data[custom_key] = str(value) + return launch_data + + +def verify_lti13_launch(platforms, params, session): + if 'id_token' not in params: + raise LTIException('Missing id_token in LTI 1.3 launch.') + unverified_claims = _decode_unverified_claims(params['id_token']) + deployment_id = unverified_claims.get(CLAIM_DEPLOYMENT_ID) + audience = unverified_claims.get('aud') + audience_options = audience if isinstance(audience, list) else [audience] + platform = None + for audience_option in audience_options: + platform = find_platform(platforms, unverified_claims.get('iss'), audience_option, deployment_id) + if platform is not None: + break + if platform is None and unverified_claims.get('azp'): + platform = find_platform(platforms, unverified_claims.get('iss'), + unverified_claims.get('azp'), deployment_id) + if platform is None: + raise LTIException('Unknown LTI 1.3 platform configuration.') + expected_state = session.get('lti13_state') + if expected_state and params.get('state') != expected_state: + raise LTIException('LTI 1.3 state mismatch.') + unverified_header = jwt.get_unverified_header(params['id_token']) + signing_key = _load_public_key(platform.get('jwks_url') or platform.get('key_set_url'), + kid=unverified_header.get('kid')) + claims = jwt.decode( + params['id_token'], + key=signing_key, + algorithms=[unverified_header.get('alg', 'RS256')], + audience=str(platform.get('client_id')), + issuer=platform.get('issuer'), + ) + expected_nonce = session.get('lti13_nonce') + if expected_nonce and claims.get('nonce') != expected_nonce: + raise LTIException('LTI 1.3 nonce mismatch.') + message_type = claims.get(CLAIM_MESSAGE_TYPE) + if not message_type: + raise LTIException('Missing LTI 1.3 message_type claim.') + session.pop('lti13_nonce', None) + session.pop('lti13_state', None) + return normalize_launch_claims(claims, platform) + + +def _load_private_key(platform): + if platform.get('private_key'): + return platform['private_key'] + private_key_file = platform.get('private_key_file') + if private_key_file: + with open(os.path.expanduser(private_key_file), 'r', encoding='utf-8') as private_key: + return private_key.read() + raise LTIPostMessageException('Missing LTI 1.3 private key configuration.') + + +def _get_access_token(platforms, course_endpoint): + platform = find_platform( + platforms, + course_endpoint.get('issuer'), + course_endpoint.get('client_id'), + course_endpoint.get('deployment_id') + ) + if platform is None: + raise LTIPostMessageException('Unknown LTI 1.3 platform for AGS request.') + token_url = course_endpoint.get('auth_token_url') or platform.get('auth_token_url') or platform.get('token_url') + if not token_url: + raise LTIPostMessageException('Missing LTI 1.3 token URL configuration.') + scopes = course_endpoint.get('scope', []) or [] + scope_string = ' '.join(scopes) + if platform.get('client_secret'): + credentials = f"{platform['client_id']}:{platform['client_secret']}".encode('utf-8') + headers = { + 'Authorization': 'Basic ' + base64.b64encode(credentials).decode('ascii') + } + _, body = post_form(token_url, { + 'grant_type': 'client_credentials', + 'scope': scope_string, + }, headers=headers) + else: + now = int(time.time()) + assertion = jwt.encode({ + 'iss': str(platform['client_id']), + 'sub': str(platform['client_id']), + 'aud': token_url, + 'iat': now, + 'exp': now + 300, + 'jti': str(uuid.uuid4()), + }, _load_private_key(platform), algorithm=platform.get('algorithm', 'RS256'), + headers={'kid': platform.get('kid')} if platform.get('kid') else None) + _, body = post_form(token_url, { + 'grant_type': 'client_credentials', + 'scope': scope_string, + 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'client_assertion': assertion, + }) + access_token = body.get('access_token') + if not access_token: + raise LTIPostMessageException('Failed to obtain LTI 1.3 AGS access token.') + return access_token, scopes + + +def read_ags_result(platforms, course_endpoint, submission_endpoint): + if RESULT_READ_SCOPE not in (course_endpoint.get('scope', []) or []): + return None + lineitem_url = submission_endpoint.get('lineitem') + if not lineitem_url: + return None + access_token, _ = _get_access_token(platforms, course_endpoint) + status, body = request_json( + f"{lineitem_url.rstrip('/')}/results?{urllib.parse.urlencode({'user_id': submission_endpoint.get('user_id', '')})}", + headers={'Authorization': 'Bearer ' + access_token} + ) + if status != 200 or not body: + return None + first_result = body[0] if isinstance(body, list) and body else None + if first_result is None: + return None + return first_result.get('resultScore') + + +def post_ags_score(platforms, course_endpoint, submission_endpoint, score, comment='', needs_review=False, + when_submitted_at=None): + lineitem_url = submission_endpoint.get('lineitem') + if not lineitem_url: + raise LTIPostMessageException('Missing LTI 1.3 AGS lineitem URL for grade passback.') + access_token, scopes = _get_access_token(platforms, course_endpoint) + if SCORE_SCOPE not in scopes: + raise LTIPostMessageException('LTI 1.3 AGS score scope is not available for this launch.') + body = { + 'timestamp': when_submitted_at or time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), + 'scoreGiven': score, + 'scoreMaximum': 1, + 'activityProgress': 'Completed', + 'gradingProgress': 'PendingManual' if needs_review else 'FullyGraded', + 'userId': submission_endpoint.get('user_id', ''), + } + if comment: + body['comment'] = comment + status, response_body = request_json( + f"{lineitem_url.rstrip('/')}/scores", + method='POST', + data=body, + headers={ + 'Authorization': 'Bearer ' + access_token, + 'Accept': 'application/vnd.ims.lis.v1.score+json', + 'Content-Type': 'application/vnd.ims.lis.v1.score+json', + } + ) + log.debug("AGS post score response %s %s", status, response_body) + if status not in (200, 201, 202): + raise LTIPostMessageException('LTI 1.3 AGS score post failed.') + return True diff --git a/controllers/pylti/post_grade.py b/controllers/pylti/post_grade.py index 30549bc11..36028fcd4 100644 --- a/controllers/pylti/post_grade.py +++ b/controllers/pylti/post_grade.py @@ -3,7 +3,7 @@ send_from_directory, current_app from common.urls import normalize_url from common.filesystem import ensure_dirs -from controllers.auth import get_user, get_consumer_secrets +from controllers.auth import get_user, get_consumer_secrets, get_lti13_platforms #from controllers.endpoints.blockpy import TransmissionStatuses from controllers.helpers import (ajax_failure, parse_assignment_load, require_request_parameters, get_course_id, maybe_int, check_resource_exists, ajax_success, @@ -42,7 +42,7 @@ class GradePost: overwrite_human_grades: bool def submit(self): - lti = LTI(get_consumer_secrets(current_app)) + lti = LTI(get_consumer_secrets(current_app), lti13_platforms=get_lti13_platforms(current_app)) session['lis_outcome_service_url'] = self.lis_outcome_service_url if lti: existing_grade = lti.get_grade(endpoint=self.lis_result_sourcedid) diff --git a/requirements.txt b/requirements.txt index b38f7f7ad..aa43f8474 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ flask-mail flask-wtf flask-assets Flask-JWT-Extended +PyJWT[crypto] flask-migrate flask_cors flask-debugtoolbar diff --git a/tests/controllers/test_lti13.py b/tests/controllers/test_lti13.py new file mode 100644 index 000000000..4c27f945a --- /dev/null +++ b/tests/controllers/test_lti13.py @@ -0,0 +1,165 @@ +import tempfile +import time + +import jwt +from flask import session + +from main import create_app + + +STATIC_SETTINGS = { + 'TESTING': True, + 'HOST': 'localhost', + 'SERVER_NAME': 'localhost', + 'PORT': 5001, + 'SITE_URL': 'localhost:5001', + 'TASK_QUEUE_STYLE': 'sqlite', + 'WTF_CSRF_ENABLED': False, + 'SECRET_KEY': 'test-secret-key', + 'SESSION_COOKIE_DOMAIN': None, + 'SESSION_COOKIE_SECURE': False, + 'COOKIE_SAMESITE': 'Lax', + 'SESSION_COOKIE_HTTPONLY': False, + 'SESSION_COOKIE_SAMESITE': 'Lax', +} + + +def create_test_app(): + tempdir = tempfile.TemporaryDirectory() + db_path = f'{tempdir.name}/test.db' + task_db_path = f'{tempdir.name}/task.db' + app = create_app('testing', { + **STATIC_SETTINGS, + 'SQLALCHEMY_DATABASE_URI': f'sqlite:///{db_path}', + 'SQLALCHEMY_DATABASE_URI_ALEMBIC': f'sqlite:///{db_path}', + 'TASK_DB_URI': task_db_path, + }) + return tempdir, app + + +def make_claims(claim_message_type, claim_version, claim_deployment_id, claim_roles, + claim_context, claim_resource_link, claim_launch_presentation, + claim_custom, claim_ags_endpoint): + return { + 'iss': 'https://canvas.example.edu', + 'aud': 'client-123', + 'sub': 'user-42', + 'nonce': 'expected-nonce', + 'exp': int(time.time()) + 600, + claim_message_type: 'LtiResourceLinkRequest', + claim_version: 'LTI-1p3', + claim_deployment_id: 'deployment-9', + claim_roles: ['http://purl.imsglobal.org/vocab/lis/v2/membership#Learner'], + claim_context: { + 'id': 'course-5', + 'label': 'CS101', + 'title': 'Intro CS', + }, + claim_resource_link: { + 'id': 'resource-7', + 'title': 'Assignment 1', + }, + claim_launch_presentation: { + 'return_url': 'https://canvas.example.edu/return', + 'document_target': 'iframe', + }, + claim_custom: { + 'canvas_course_id': '1234', + }, + claim_ags_endpoint: { + 'lineitem': 'https://canvas.example.edu/api/lti/courses/5/line_items/7', + 'lineitems': 'https://canvas.example.edu/api/lti/courses/5/line_items', + 'scope': [ + 'https://purl.imsglobal.org/spec/lti-ags/scope/score', + 'https://purl.imsglobal.org/spec/lti-ags/scope/result.readonly', + ] + }, + 'email': 'student@example.edu', + 'name': 'Student Example', + 'given_name': 'Student', + 'family_name': 'Example', + } + + +def test_verify_lti13_launch_normalizes_claims(monkeypatch): + tempdir, app = create_test_app() + try: + from controllers.pylti import lti13 + + token = jwt.encode(make_claims( + lti13.CLAIM_MESSAGE_TYPE, + lti13.CLAIM_VERSION, + lti13.CLAIM_DEPLOYMENT_ID, + lti13.CLAIM_ROLES, + lti13.CLAIM_CONTEXT, + lti13.CLAIM_RESOURCE_LINK, + lti13.CLAIM_LAUNCH_PRESENTATION, + lti13.CLAIM_CUSTOM, + lti13.CLAIM_AGS_ENDPOINT, + ), 'shared-secret', algorithm='HS256', headers={'kid': 'kid-1'}) + with app.test_request_context('/', method='POST'): + session['lti13_state'] = 'expected-state' + session['lti13_nonce'] = 'expected-nonce' + monkeypatch.setattr(lti13, '_load_public_key', lambda *args, **kwargs: 'shared-secret') + launch_data = lti13.verify_lti13_launch([{ + 'issuer': 'https://canvas.example.edu', + 'client_id': 'client-123', + 'jwks_url': 'https://canvas.example.edu/jwks', + 'auth_token_url': 'https://canvas.example.edu/token', + 'service': 'canvas', + }], {'id_token': token, 'state': 'expected-state'}, session) + assert launch_data['pylti_user_id'] == 'user-42' + assert launch_data['context_id'] == 'course-5' + assert launch_data['custom_canvas_course_id'] == '1234' + course_endpoint = lti13.parse_lti13_endpoint(launch_data['lis_outcome_service_url']) + submission_endpoint = lti13.parse_lti13_endpoint(launch_data['lis_result_sourcedid']) + assert course_endpoint['client_id'] == 'client-123' + assert submission_endpoint['lineitem'].endswith('/7') + assert 'lti13_state' not in session + assert 'lti13_nonce' not in session + finally: + tempdir.cleanup() + + +def test_lti_post_grade_uses_ags(monkeypatch): + tempdir, app = create_test_app() + try: + from controllers.pylti.flask import LTI + from controllers.pylti import lti13 + + with app.test_request_context('/', method='POST'): + session['lis_outcome_service_url'] = ('{"auth_token_url":"https://canvas.example.edu/token",' + '"client_id":"client-123","deployment_id":"deployment-9",' + '"issuer":"https://canvas.example.edu","lineitems":"https://canvas.example.edu/api/lti/courses/5/line_items",' + '"lti_version":"LTI-1p3","scope":["https://purl.imsglobal.org/spec/lti-ags/scope/score"]}') + session['lis_result_sourcedid'] = ('{"lineitem":"https://canvas.example.edu/api/lti/courses/5/line_items/7",' + '"lineitems":"https://canvas.example.edu/api/lti/courses/5/line_items",' + '"lti_version":"LTI-1p3","resource_link_id":"resource-7","user_id":"user-42"}') + captured = {} + + def fake_post_ags_score(platforms, course_endpoint, submission_endpoint, score, comment='', + needs_review=False, when_submitted_at=None): + captured['platforms'] = platforms + captured['course_endpoint'] = course_endpoint + captured['submission_endpoint'] = submission_endpoint + captured['score'] = score + captured['comment'] = comment + captured['needs_review'] = needs_review + captured['when_submitted_at'] = when_submitted_at + return True + + monkeypatch.setattr('controllers.pylti.flask.post_ags_score', fake_post_ags_score) + lti = LTI({}, use_session=session, lti13_platforms=[{ + 'issuer': 'https://canvas.example.edu', + 'client_id': 'client-123', + 'auth_token_url': 'https://canvas.example.edu/token', + }]) + assert lti.post_grade(0.75, 'Great work', needs_review=True, + when_submitted_at='2026-05-31T00:00:00Z') is True + assert captured['score'] == 0.75 + assert captured['comment'] == 'Great work' + assert captured['needs_review'] is True + assert captured['submission_endpoint']['user_id'] == 'user-42' + assert lti13.parse_lti13_endpoint(session['lis_outcome_service_url'])['issuer'] == 'https://canvas.example.edu' + finally: + tempdir.cleanup() From 579d45a55a38513ea09be42b9120eaf397b55da4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 31 May 2026 14:42:37 +0000 Subject: [PATCH 2/2] Finalize LTI 1.3 support --- controllers/pylti/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/pylti/common.py b/controllers/pylti/common.py index a3f3c0c98..14a64ed37 100644 --- a/controllers/pylti/common.py +++ b/controllers/pylti/common.py @@ -183,7 +183,7 @@ def _post_patched_request(consumers, lti_key, body, if lti_cert: client.add_certificate(key=key_cert, cert=lti_cert, domain='') - log.debug("cert %s", lti_cert) + log.debug("client certificate configured") import httplib2 http = httplib2.Http