-
Notifications
You must be signed in to change notification settings - Fork 2
WIP: Schoology & Canva Integration #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,19 @@ def google_id_to_user_id(google_id): | |
| except ValueError: | ||
| debug_log("Error handling:", google_id) | ||
| raise | ||
|
|
||
| def canvas_id_to_user_id(google_id): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is exactly the same as the Most of the student events should be getting recorded with a Google ID. After signing into either LMS, we need to be able to fetch to Google ID somehow. |
||
| ''' | ||
| Convert a Google ID like: | ||
| `72635729500910017892163494291` | ||
| to: | ||
| `gc-72635729500910017892163494291` | ||
| ''' | ||
| try: | ||
| return "gc-" + str(int(google_id)) | ||
| except ValueError: | ||
| debug_log("Error handling:", google_id) | ||
| raise | ||
|
|
||
|
|
||
| def fernet_key(secret_string): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,308 @@ | ||
| import os | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay now I'm a bit more confused. Is the |
||
| import json | ||
| import string | ||
| import recordclass | ||
| import configparser | ||
| import aiohttp | ||
| import aiohttp.web | ||
|
|
||
| import learning_observer.settings as settings | ||
| import learning_observer.log_event | ||
| import learning_observer.util | ||
| import learning_observer.auth | ||
| import learning_observer.runtime | ||
|
|
||
| cache = None | ||
|
|
||
| class Endpoint(recordclass.make_dataclass("Endpoint", ["name", "remote_url", "doc", "cleaners"], defaults=["", None])): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What of this can be abstracted out? We use a similar approach for both Google and Schoology. We don't want to repeat code 3 times. |
||
| def arguments(self): | ||
| return extract_parameters_from_format_string(self.remote_url) | ||
|
|
||
| def _local_url(self): | ||
| parameters = "}/{".join(self.arguments()) | ||
| base_url = f"/canvas/{self.name}" | ||
| if len(parameters) == 0: | ||
| return base_url | ||
| else: | ||
| return base_url + "/{" + parameters + "}" | ||
|
|
||
| def _add_cleaner(self, name, cleaner): | ||
| if self.cleaners is None: | ||
| self.cleaners = dict() | ||
| self.cleaners[name] = cleaner | ||
| if 'local_url' not in cleaner: | ||
| cleaner['local_url'] = self._local_url + "/" + name | ||
|
|
||
| def _cleaners(self): | ||
| if self.cleaners is None: | ||
| return [] | ||
| else: | ||
| return self.cleaners | ||
|
|
||
| ENDPOINTS = list(map(lambda x: Endpoint(*x), [ | ||
| ("course_list", "/courses"), | ||
| ("course_roster", "/courses/{courseId}/students"), | ||
| ("course_work", "/courses/{courseId}/assignments"), | ||
| ("coursework_submissions", "/courses/{courseId}/assignments/{assignmentId}/submissions"), | ||
| ])) | ||
|
|
||
| def extract_parameters_from_format_string(format_string): | ||
| ''' | ||
| Extracts parameters from a format string. E.g. | ||
| >>> ("hello {hi} my {bye}")] | ||
| ['hi', 'bye'] | ||
| ''' | ||
| # The parse returns a lot of context, which we discard. In particular, the | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is copied straight from the |
||
| # last item is often about the suffix after the last parameter and may be | ||
| # `None` | ||
| return [f[1] for f in string.Formatter().parse(format_string) if f[1] is not None] | ||
|
|
||
| class Canvas: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a whole class needed here? Google gets by without it. If the user logs in with Canvas the request object should contain their auth information to query the Canvas APIs. |
||
| def __init__(self, config_path='./config.ini'): | ||
| script_dir = os.path.dirname(os.path.abspath(__file__)) | ||
| config_path = os.path.join(script_dir, config_path) | ||
|
|
||
| self.config = configparser.ConfigParser() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PMSS |
||
| self.config.read(config_path) | ||
|
|
||
| # Check if 'SCHOOLOGY_CONFIG' section is present | ||
| if 'CANVAS_CONFIG' not in self.config: | ||
| raise KeyError("The configuration file does not contain 'CANVAS_CONFIG' section") | ||
|
|
||
| try: | ||
| self.defaultServer = self.config['CANVAS_CONFIG']['DEFAULT_SERVER'] | ||
| self.access_token = self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] | ||
| self.refresh_token = self.config['CANVAS_CONFIG']['REFRESH_TOKEN'] | ||
| self.client_id = self.config['CANVAS_CONFIG']['CLIENT_ID'] | ||
| self.client_secret = self.config['CANVAS_CONFIG']['CLIENT_SECRET'] | ||
| except KeyError as e: | ||
| raise KeyError(f"Missing required configuration key: {e}") | ||
|
|
||
| self.default_version = 'v1' | ||
| self.defaultPerPage = 10000 | ||
| self.base_url = f'https://{self.defaultServer}/api/{self.default_version}' | ||
|
|
||
| def update_access_tokens(self, access_token): | ||
| self.config['CANVAS_CONFIG']['ACCESS_TOKEN'] = access_token | ||
| self.access_token = access_token | ||
| with open('./config.ini', 'w') as configfile: | ||
| self.config.write(configfile) | ||
|
|
||
| async def api_call(self, method, endpoint, params=None, data=None, absolute_url=False, retry=True, **kwargs): | ||
| if absolute_url: | ||
| url = endpoint | ||
| else: | ||
| url = self.base_url + endpoint | ||
| if params: | ||
| url += '?' + '&'.join(f"{k}={v}" for k, v in params.items()) | ||
|
|
||
| url = url.format(**kwargs) | ||
|
|
||
| headers = { | ||
| 'Authorization': f'Bearer {self.access_token}', | ||
| 'Content-Type': 'application/json' | ||
| } | ||
|
|
||
| async with aiohttp.ClientSession() as client: | ||
| response_func = getattr(client, method.lower()) | ||
| async with response_func(url, headers=headers, params=params, json=data) as response: | ||
| if response.status == 401 and retry: | ||
| new_tokens = self.refresh_tokens() | ||
| if 'access_token' in new_tokens: | ||
| self.update_access_tokens(new_tokens['access_token']) | ||
| return await self.api_call(method, endpoint, params, data, absolute_url, retry=False, **kwargs) | ||
|
|
||
| if response.status != 200: | ||
| response.raise_for_status() | ||
|
|
||
| return await response.json() | ||
|
|
||
| async def refresh_tokens(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does this come up? I understand that we are refreshing the user's token, but I'd like to know when this occurs (docstring). |
||
| url = f'https://{self.defaultServer}/login/oauth2/token' | ||
| params = { | ||
| "grant_type": "refresh_token", | ||
| "client_id": self.client_id, | ||
| "client_secret": self.client_secret, | ||
| "refresh_token": self.refresh_token | ||
| } | ||
| return await self.api_call('POST', url, params=params, absolute_url=True) | ||
|
|
||
| async def raw_canvas_ajax(runtime, target_url, retry=False, **kwargs): | ||
| ''' | ||
| Make an AJAX call to Canvas, managing auth + auth. | ||
| * runtime is a Runtime class containing request information. | ||
| * target_url is typically grabbed from ENDPOINTS | ||
| * ... and we pass the named parameters | ||
| ''' | ||
| canvas = Canvas() | ||
|
|
||
| params = {k: v for k, v in kwargs.items() if v is not None} | ||
| try: | ||
| response = await canvas.api_call('GET', target_url, params=params, **kwargs) | ||
| response["kwargs"] = kwargs | ||
| except aiohttp.ClientResponseError as e: | ||
| if e.status == 401 and retry: | ||
| new_tokens = await canvas.refresh_tokens() | ||
| if 'access_token' in new_tokens: | ||
| canvas.update_access_tokens(new_tokens['access_token']) | ||
| return await raw_canvas_ajax(runtime, target_url, retry=False, **kwargs) | ||
| raise | ||
|
|
||
| return response | ||
|
|
||
| def raw_access_partial(remote_url, name=None): | ||
| ''' | ||
| This is a helper which allows us to create a function which calls specific | ||
| Canvas APIs. | ||
| ''' | ||
| async def caller(request, **kwargs): | ||
| ''' | ||
| Make an AJAX request to Canvas | ||
| ''' | ||
| return await raw_canvas_ajax(request, remote_url, **kwargs) | ||
| setattr(caller, "__qualname__", name) | ||
|
|
||
| return caller | ||
|
|
||
|
|
||
| def initialize_and_register_routes(app): | ||
| ''' | ||
| This is a big 'ol function which might be broken into smaller ones at some | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a big 'ol function that was copied from |
||
| point. We: | ||
| - Created debug routes to pass through AJAX requests to Google | ||
| - Created production APIs to have access to cleaned versions of said data | ||
| - Create local function calls to call from other pieces of code | ||
| within process | ||
| We probably don't need all of this in production, but a lot of this is | ||
| very important for debugging. Having APIs is more useful than it looks, since | ||
| making use of Google APIs requires a lot of infrastructure (registering | ||
| apps, auth/auth, etc.) which we already have in place on dev / debug servers. | ||
| ''' | ||
| app.add_routes([ | ||
| aiohttp.web.get("/canvas", api_docs_handler) | ||
| ]) | ||
|
|
||
| def make_ajax_raw_handler(remote_url): | ||
| async def ajax_passthrough(request): | ||
| runtime = learning_observer.runtime.Runtime(request) | ||
| response = await raw_canvas_ajax(runtime, remote_url, retry=True, **request.match_info) | ||
| return aiohttp.web.json_response(response) | ||
| return ajax_passthrough | ||
|
|
||
| def make_cleaner_handler(raw_function, cleaner_function, name=None): | ||
| async def cleaner_handler(request): | ||
| response = cleaner_function(await raw_function(request, **request.match_info)) | ||
| if isinstance(response, dict) or isinstance(response, list): | ||
| return aiohttp.web.json_response(response) | ||
| elif isinstance(response, str): | ||
| return aiohttp.web.Response(text=response) | ||
| else: | ||
| raise AttributeError(f"Invalid response type: {type(response)}") | ||
| if name is not None: | ||
| setattr(cleaner_handler, "__qualname__", name + "_handler") | ||
|
|
||
| return cleaner_handler | ||
|
|
||
| def make_cleaner_function(raw_function, cleaner_function, name=None): | ||
| async def cleaner_local(request, **kwargs): | ||
| canvas_response = await raw_function(request, **kwargs) | ||
| clean = cleaner_function(canvas_response) | ||
| return clean | ||
| if name is not None: | ||
| setattr(cleaner_local, "__qualname__", name) | ||
| return cleaner_local | ||
|
|
||
| for e in ENDPOINTS: | ||
| function_name = f"raw_{e.name}" | ||
| raw_function = raw_access_partial(remote_url=e.remote_url, name=e.name) | ||
| globals()[function_name] = raw_function | ||
| cleaners = e._cleaners() | ||
| for c in cleaners: | ||
| app.add_routes([ | ||
| aiohttp.web.get( | ||
| cleaners[c]['local_url'], | ||
| make_cleaner_handler( | ||
| raw_function, | ||
| cleaners[c]['function'], | ||
| name=cleaners[c]['name'] | ||
| ) | ||
| ) | ||
| ]) | ||
| globals()[cleaners[c]['name']] = make_cleaner_function( | ||
| raw_function, | ||
| cleaners[c]['function'], | ||
| name=cleaners[c]['name'] | ||
| ) | ||
| app.add_routes([ | ||
| aiohttp.web.get(e._local_url(), make_ajax_raw_handler(e.remote_url)) | ||
| ]) | ||
|
|
||
| def api_docs_handler(request): | ||
| response = "URL Endpoints:\n\n" | ||
| for endpoint in ENDPOINTS: | ||
| response += f"{endpoint._local_url()}\n" | ||
| cleaners = endpoint._cleaners() | ||
| for c in cleaners: | ||
| response += f" {cleaners[c]['local_url']}\n" | ||
| response += "\n\n Globals:" | ||
| return aiohttp.web.Response(text=response) | ||
|
|
||
| def register_cleaner(data_source, cleaner_name): | ||
| def decorator(f): | ||
| found = False | ||
| for endpoint in ENDPOINTS: | ||
| if endpoint.name == data_source: | ||
| found = True | ||
| endpoint._add_cleaner( | ||
| cleaner_name, | ||
| { | ||
| 'function': f, | ||
| 'local_url': f'{endpoint._local_url()}/{cleaner_name}', | ||
| 'name': cleaner_name | ||
| } | ||
| ) | ||
| if not found: | ||
| raise AttributeError(f"Data source {data_source} invalid; not found in endpoints.") | ||
| return f | ||
| return decorator | ||
|
|
||
| @register_cleaner("course_roster", "roster") | ||
| def clean_course_roster(canvas_json): | ||
| students = canvas_json | ||
| students_updated = [] | ||
| #students.sort(key=lambda x: x.get('name', {}).get('fullName', 'ZZ')) | ||
| for student_json in students: | ||
| canvas_id = student_json['id'] | ||
| student = { | ||
| "course_id": "65166371789", | ||
| "user_id": canvas_id, | ||
| "profile": { | ||
| "id": canvas_id, | ||
| "name": { | ||
| "given_name": student_json['name'], | ||
| "family_name": student_json['name'], | ||
| "full_name": student_json['name'] | ||
| } | ||
| } | ||
| } | ||
| #local_id = learning_observer.auth.canvas_id_to_user_id(canvas_id) | ||
| #student_json['user_id'] = local_id | ||
| if 'external_ids' not in student: | ||
| student['external_ids'] = [] | ||
| student['external_ids'].append({"source": "canvas", "id": canvas_id}) | ||
| students_updated.append(student) | ||
| return students_updated | ||
|
|
||
| @register_cleaner("course_list", "courses") | ||
| def clean_course_list(canvas_json): | ||
| courses = canvas_json | ||
| courses.sort(key=lambda x: x.get('name', 'ZZ')) | ||
| return courses | ||
|
|
||
| if __name__ == '__main__': | ||
| output = clean_course_roster({}) | ||
| print(json.dumps(output, indent=2)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| Get your Consumer Key & Consumer Secret by: | ||
| Going to https://(DISTRICT NAME HERE).schoology.com/api | ||
|
|
||
|
|
||
| Get your user ID by: | ||
| Going to your profile. | ||
| Look at the search bar. | ||
| https://(DISTRICT NAME HERE).schoology.com/user/(THE NUMBER HERE IS YOUR USER ID)/info | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens next? Is our key shown to us? Where do we put it? |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All settings should be defined using PMSS. I would assume this code currently don't have PMSS anywhere in it. ArgLab is a bit behind and ought to catch up.