oaebu_workflows.jstor_telescope.jstor_telescope
Module Contents
Classes
Construct a JstorRelease. |
|
The JSTOR telescope. |
|
Helper class that provides a standard way to create an ABC using |
|
Helper class that provides a standard way to create an ABC using |
|
Helper class that provides a standard way to create an ABC using |
Functions
Build the gmail service. |
|
|
Create the Jstor API Instance. |
- class oaebu_workflows.jstor_telescope.jstor_telescope.JstorRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date, reports)[source]
Bases:
observatory.platform.workflows.workflow.PartitionRelease
Construct a JstorRelease.
- Parameters:
dag_id (str) – The ID of the DAG
run_id (str) – The Airflow run ID
data_interval_start (pendulum.DateTime) – The beginning of the data interval
data_interval_end (pendulum.DateTime) – The end of the data interval
partition_date (pendulum.DateTime) – the partition date, corresponds to the last day of the month being processed.
reports (List[dict]) – list with report_type (country or institution) and url of reports
- class oaebu_workflows.jstor_telescope.jstor_telescope.JstorTelescope(dag_id, cloud_workspace, entity_id, entity_type='publisher', country_partner='jstor_country', institution_partner='jstor_institution', bq_dataset_description='Data from JSTOR sources', bq_country_table_description=None, bq_institution_table_description=None, api_dataset_id='jstor', gmail_api_conn_id='gmail_api', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, catchup=False, max_active_runs=1, schedule='0 0 4 * *', start_date=pendulum.datetime(2016, 10, 1))[source]
Bases:
observatory.platform.workflows.workflow.Workflow
The JSTOR telescope.
Construct a JstorTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param entity_id: The ID of the publisher for this DAG :param entity_type: Whether this entity should be treated as a publisher or a collection :param country_partner: The name of the country partner :param institution_partner: The name of the institution partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_country_table_description: Description for the BigQuery JSTOR country table :param bq_institution_table_description: Description for the BigQuery JSTOR institution table :param api_dataset_id: The ID to store the dataset release in the API :param gmail_api_conn_id: Airflow connection ID for the Gmail API :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param catchup: Whether to catchup the DAG or not :param max_active_runs: The maximum number of DAG runs that can be run concurrently :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG
- Parameters:
dag_id (str) –
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –
entity_id (str) –
entity_type (Literal[publisher, collection]) –
country_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
institution_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
bq_dataset_description (str) –
bq_country_table_description (Optional[str]) –
bq_institution_table_description (Optional[str]) –
api_dataset_id (str) –
gmail_api_conn_id (str) –
observatory_api_conn_id (str) –
catchup (bool) –
max_active_runs (int) –
schedule (str) –
start_date (pendulum.DateTime) –
- make_release(**kwargs)[source]
Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.
- Parameters:
kwargs – the context passed from the PythonOperator.
- Return type:
List[JstorRelease]
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list of grid release instances
- list_reports(**kwargs)[source]
Lists all Jstor releases for a given month and publishes their report_type, download_url and release_date’s as an XCom.
- Returns:
Whether to continue the DAG
- Return type:
bool
- download_reports(**kwargs)[source]
Download the JSTOR reports based on the list with available reports. The release date for each report is only known after downloading the report. Therefore they are first downloaded to a temporary location, afterwards the release info can be pushed as an xcom and the report is moved to the correct location.
- Returns:
Whether to continue the DAG (always True)
- Return type:
bool
- upload_downloaded(releases, **kwargs)[source]
Uploads the downloaded files to GCS for each release
- Parameters:
releases (List[JstorRelease]) – List of JstorRelease instances:
- Return type:
None
- transform(releases, **kwargs)[source]
Task to transform the Jstor releases for a given month.
- Parameters:
releases (List[JstorRelease]) –
- upload_transformed(releases, **kwargs)[source]
Uploads the transformed files to GCS for each release
- Parameters:
releases (List[JstorRelease]) –
- Return type:
None
- bq_load(releases, **kwargs)[source]
Loads the sales and traffic data into BigQuery
- Parameters:
releases (List[JstorRelease]) –
- Return type:
None
- add_new_dataset_releases(releases, **kwargs)[source]
Adds release information to API.
- Parameters:
releases (List[JstorRelease]) –
- Return type:
None
- cleanup(releases, **kwargs)[source]
Delete all files, folders and XComs associated with this release. Assign a label to the gmail messages that have been processed.
- Parameters:
releases (List[JstorRelease]) –
- Return type:
None
- oaebu_workflows.jstor_telescope.jstor_telescope.create_gmail_service()[source]
Build the gmail service.
- Returns:
Gmail service instance
- Return type:
googleapiclient.discovery.Resource
- oaebu_workflows.jstor_telescope.jstor_telescope.make_jstor_api(entity_type, entity_id)[source]
Create the Jstor API Instance.
- Parameters:
entity_type (Literal[publisher, collection]) – The entity type. Should be either ‘publisher’ or ‘collection’.
entity_id (str) – The entity id.
- Returns:
The Jstor API instance
- class oaebu_workflows.jstor_telescope.jstor_telescope.JstorAPI(service, entity_id)[source]
Bases:
abc.ABC
Helper class that provides a standard way to create an ABC using inheritance.
- Parameters:
service (Any) –
entity_id (str) –
- get_messages(list_params)[source]
Get messages from the Gmail API.
- Parameters:
list_params (dict) – The parameters that will be passed to the Gmail API.
- Return type:
List[dict]
- get_label_id(label_name)[source]
Get the id of a label based on the label name.
- Parameters:
label_name (str) – The name of the label
- Returns:
The label id
- Return type:
str
- abstract get_release_date(report_path)[source]
- Parameters:
report_path (str) –
- Return type:
tuple[pendulum.DateTime, pendulum.DateTime]
- abstract download_report(report, download_path)[source]
- Parameters:
report (dict) –
download_path (str) –
- Return type:
None
- abstract transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]
- Parameters:
download_country (str) –
download_institution (str) –
transform_country (str) –
transform_institution (str) –
partition_date (pendulum.DateTime) –
- Return type:
None
- class oaebu_workflows.jstor_telescope.jstor_telescope.JstorPublishersAPI(service, entity_id)[source]
Bases:
JstorAPI
Helper class that provides a standard way to create an ABC using inheritance.
- Parameters:
service (Any) –
entity_id (str) –
- list_reports()[source]
List the available releases by going through the messages of a gmail account and looking for a specific pattern.
If a message has been processed previously it has a specific label, messages with this label will be skipped. The message should include a download url. The head of this download url contains the filename, from which the release date and publisher can be derived.
- Returns:
A list if dictionaries representing the messages with reports. Each has keys “type”, “url” and “id”.
- Return type:
List[dict]
- download_report(report, download_path)[source]
Download report from url to a file.
- Parameters:
report (dict) – The report info. Should contain the “url” key
download_path (str) – Path to download data to
- Return type:
None
- get_header_info(url)[source]
Get header info from url and parse for filename and extension of file.
- Parameters:
url (str) – Download url
- Returns:
Filename and file extension
- Return type:
Tuple[str, str]
- get_release_date(report_path)[source]
Get the release date from the “Reporting_Period” part of the header. Also checks if the reports contains data from exactly one month.
- Parameters:
report_path (str) – The path to the JSTOR report
- Returns:
The start and end of the release month
- Return type:
Tuple[pendulum.DateTime, pendulum.DateTime]
- get_release_date_deprecated(report_path)[source]
This function is deprecated, because the headers for the reports have changed since 2021-10-01. It might still be used for reports that were created before this date and have not been processed yet. Get the release date from the “Usage Month” column in the first row of the report. Also checks if the reports contains data from the same month only.
- Parameters:
report_path (str) – The path to the JSTOR report
- Returns:
The start and end dates
- Return type:
Tuple[pendulum.DateTime, pendulum.DateTime]
- transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]
Transform a Jstor release into json lines format and gzip the result._summary_
- Parameters:
download_country (str) – The path to the country download report
download_institution (str) – The path to the institution download report
transform_country (str) – The path to write the transformed country file to
transform_institution (str) – The path to write the transformed institution file to
partition_date (pendulum.DateTime) – The partition/release date of this report
- Return type:
None
- class oaebu_workflows.jstor_telescope.jstor_telescope.JstorCollectionsAPI(service, entity_id)[source]
Bases:
JstorAPI
Helper class that provides a standard way to create an ABC using inheritance.
- Parameters:
service (Any) –
entity_id (str) –
- list_reports()[source]
List the available reports by going through the gmail messages from a specific sender.
- Returns:
A list if dictionaries representing the messages with reports as attachments. Each has keys “type”, “attachment_id” and “id”.
- Return type:
List[dict]
- download_report(report, download_path)[source]
Download report from url to a file.
- Parameters:
report (dict) – The report info. Should contain the “id” and “attachement_id” keys
download_path (str) – Path to download data to
- Return type:
None
- get_release_date(report_path)[source]
Get the release date from the report. This should be under the “Month, Year of monthdt” column Also checks if the reports contains data from exactly one month.
- Parameters:
report_path (str) – The path to the JSTOR report
- Returns:
The start and end of the release month
- Return type:
Tuple[pendulum.DateTime, pendulum.DateTime]
- transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]
Transform a Jstor release into json lines format and gzip the result._summary_
- Parameters:
download_country (str) – The path to the country download report
download_institution (str) – The path to the institution download report
transform_country (str) – The path to write the transformed country file to
transform_institution (str) – The path to write the transformed institution file to
partition_date (pendulum.DateTime) – The partition/release date of this report
- Return type:
None