Source code for canvas_data.api

import gzip
import logging
import os
import time
import requests
from requests.exceptions import ConnectionError, RequestException

from .exceptions import (APIConnectionError, CanvasDataAPIError,
                         MissingCredentialsError)
from .hmac_auth import API_ROOT, CanvasDataHMACAuth

logger = logging.getLogger(__name__)


[docs]def retry(func): """File download retry decorator""" def retried_func(*args, **kwargs): MAX_TRIES = 3 tries = 0 while True: try: resp = func(*args, **kwargs) logger.debug(resp.request.headers) if resp.status_code != 200 and tries < MAX_TRIES: logger.warning("Got a non-200 response ({}) - going to retry.".format(resp.status_code)) tries += 1 time.sleep(2) continue except ConnectionError as e: resp = None if tries < MAX_TRIES: tries += 1 logger.exception("ConnectionError - %d/%d tries", tries, MAX_TRIES) time.sleep(2) continue else: logger.exception("ConnectionError - reached the retry limit") raise e break return resp return retried_func
@retry def _get_with_retries(*args, **kwargs): return requests.get(*args, **kwargs)
[docs]class CanvasDataAPI(object): def __init__(self, api_key, api_secret, download_chunk_size=1024*1024): if not api_key or not api_secret: raise MissingCredentialsError(self) self.api_key = api_key self.api_secret = api_secret self.schema = {} self.schema_versions = None self.download_chunk_size = download_chunk_size
[docs] def get_schema_versions(self): """Get the list of all available schema versions.""" url = '{}/api/schema'.format(API_ROOT) if self.schema_versions: return self.schema_versions else: try: response = _get_with_retries(url, auth=CanvasDataHMACAuth(self.api_key, self.api_secret)) if response.status_code == 200: schema_versions = response.json() self.schema_versions = schema_versions return schema_versions else: response_data = response.json() raise CanvasDataAPIError(response_data['message']) except ConnectionError as e: raise APIConnectionError("A connection error occurred", e) except RequestException as e: raise CanvasDataAPIError("A generic requests error occurred", e)
[docs] def get_schema(self, version='latest', key_on_tablenames=False): """ Get a particular version of the schema. Note that the keys in the returned data structure are usually, but not always, the same as the table names. If you'd rather have the keys in the returned data structure always exactly match the table names, set `key_on_tablenames=True` """ url = '{}/api/schema/{}'.format(API_ROOT, version) cache_key = '{}/{}'.format(version, key_on_tablenames) if cache_key in self.schema: return self.schema[cache_key] else: try: response = _get_with_retries(url, auth=CanvasDataHMACAuth(self.api_key, self.api_secret)) if response.status_code == 200: schema = response.json() if key_on_tablenames: fixed_schema = {} for k, v in schema['schema'].items(): fixed_schema[v['tableName']] = v self.schema[cache_key] = fixed_schema return fixed_schema self.schema[cache_key] = schema['schema'] return schema['schema'] else: response_data = response.json() raise CanvasDataAPIError(response_data['message']) except ConnectionError as e: raise APIConnectionError("A connection error occurred", e) except RequestException as e: raise CanvasDataAPIError("A generic requests error occurred", e)
[docs] def get_dumps(self, account_id='self', limit=100, after_sequence=None): """Get a list of all dumps""" url = '{}/api/account/{}/dump'.format(API_ROOT, account_id) try: params = { 'limit': limit, } if after_sequence: params['after'] = after_sequence response = _get_with_retries(url, params=params, auth=CanvasDataHMACAuth(self.api_key, self.api_secret)) if response.status_code == 200: dumps = response.json() return dumps else: try: response_data = response.json() raise CanvasDataAPIError(response_data['message']) except: raise CanvasDataAPIError(response.text) except ConnectionError as e: raise APIConnectionError("A connection error occurred", e) except RequestException as e: raise CanvasDataAPIError("A generic requests error occurred", e)
[docs] def get_file_urls(self, account_id='self', **kwargs): """Get a list of file URLs, either by dump_id (or latest) or by table_name.""" if kwargs.get('dump_id'): if kwargs['dump_id'] == 'latest': url = '{}/api/account/{}/file/latest'.format(API_ROOT, account_id) else: url = '{}/api/account/{}/file/byDump/{}'.format(API_ROOT, account_id, kwargs['dump_id']) elif kwargs.get('table_name'): url = '{}/api/account/{}/file/byTable/{}'.format(API_ROOT, account_id, kwargs['table_name']) else: raise CanvasDataAPIError("Must pass either dump_id or table_name") try: response = _get_with_retries(url, auth=CanvasDataHMACAuth(self.api_key, self.api_secret)) if response.status_code == 200: files = response.json() return files else: response_data = response.json() raise CanvasDataAPIError(response_data['message']) except ConnectionError as e: raise APIConnectionError("A connection error occurred") except RequestException as e: raise CanvasDataAPIError("A generic requests error occurred", e)
[docs] def get_sync_file_urls(self, account_id='self'): """Get a list of file URLs that constitute a complete snapshot of the current data""" url = '{}/api/account/{}/file/sync'.format(API_ROOT, account_id) try: response = _get_with_retries(url, auth=CanvasDataHMACAuth(self.api_key, self.api_secret)) if response.status_code == 200: files = response.json() return files else: response_data = response.json() raise CanvasDataAPIError(response_data['message']) except ConnectionError as e: raise APIConnectionError("A connection error occurred", e) except RequestException as e: raise CanvasDataAPIError("A generic requests error occurred", e)
[docs] def download_files(self, account_id='self', dump_id=None, table_name=None, download_directory='./downloads', include_requests=True, force=False): """Download all of the files for a specific dump, all of the files for a specific table, or the files for a specific table from a specific dump.""" local_files = [] if dump_id: dump_files = self.get_file_urls(account_id=account_id, dump_id=dump_id) for dump_table_name, artifacts in dump_files['artifactsByTable'].items(): if table_name and table_name != dump_table_name: continue else: if dump_table_name == 'requests' and not include_requests: continue else: # download the files for file in artifacts['files']: local_files.append(self.get_file(file=file, download_directory=download_directory, force=force)) elif table_name: # no dump ID was specified; just get all of the files for the specified table dump_files = self.get_file_urls(account_id=account_id, table_name=table_name) for dump in dump_files['history']: for file in dump['files']: local_files.append(self.get_file(file=file, download_directory=download_directory, force=force)) else: raise CanvasDataAPIError("Neither dump_id or table_name was specified; must specify at least one.") return local_files
[docs] def get_file(self, file, download_directory='./downloads', force=False): # make sure that the download directory exists if not os.path.exists(download_directory): os.makedirs(download_directory) target_file = os.path.join(download_directory, file['filename']) if os.path.isfile(target_file) and not force: logger.debug("Not downloading %s because it already exists.", target_file) pass else: logger.debug("Downloading %s because it doesn't exist yet.", target_file) r = _get_with_retries(file['url'], stream=True) with open(target_file, 'wb') as fd: for chunk in r.iter_content(chunk_size=self.download_chunk_size): fd.write(chunk) return target_file
[docs] def get_data_for_table(self, table_name, account_id='self', dump_id='latest', data_directory='./data', download_directory='./downloads', force=False): """ Decompresses and concatenates the dump files for a particular table and writes the resulting data to a text file. If a sequence parameter is passed in, the output filename will be prefixed with the sequence. """ # make sure that the data directory exists if not os.path.exists(data_directory): os.makedirs(data_directory) outfilename = os.path.join(data_directory, '{}.txt'.format(table_name)) if os.path.isfile(outfilename) and not force: logger.debug("Not overwriting %s because it already exists.", outfilename) return outfilename else: # get the raw data files files = self.download_files(account_id=account_id, dump_id=dump_id, table_name=table_name, download_directory=download_directory) with open(outfilename, 'wb') as outfile: # gunzip each file and write the data to the output file for infilename in files: with gzip.open(infilename, 'rb') as infile: try: outfile.write(infile.read()) except IOError: msg = 'Error preparing data for table {}. Input file: {} Output file: {}'.format(table_name, infilename, outfilename) raise CanvasDataAPIError(msg) return outfilename
[docs] def get_data_for_dump(self, dump_id='latest', account_id='self', data_directory='./data', download_directory='./downloads', include_requests=False, force=False): """Decompresses and concatenates the dump files for all of the tables in a particular dump.""" dump = self.get_file_urls(dump_id=dump_id, account_id=account_id) dump_table_names = dump['artifactsByTable'].keys() outfiles = [] for table_name in dump_table_names: if table_name == 'requests' and not include_requests: continue filename = self.get_data_for_table(table_name=table_name, account_id=account_id, dump_id=dump_id, data_directory=data_directory, download_directory=download_directory) outfiles.append(filename) return outfiles
[docs] def get_latest_regular_dump(self, account_id='self'): """Finds the latest dump_id that isn't a full requests dump.""" last_two_dumps = self.get_dumps(account_id=account_id, limit=2) dump_files = self.get_file_urls(account_id=account_id, dump_id=last_two_dumps[0]['dumpId']) if dump_files['artifactsByTable']['requests']['partial']: # this is not a full requests dump - just a regular dump return last_two_dumps[0]['dumpId'] else: # this is a full requests dump; return the second-most-recent dump ID instead return last_two_dumps[1]['dumpId']