oaebu_workflows.jstor_telescope.jstor_telescope

Module Contents

Classes

JstorRelease

Construct a JstorRelease.

JstorTelescope

The JSTOR telescope.

JstorAPI

Helper class that provides a standard way to create an ABC using

JstorPublishersAPI

Helper class that provides a standard way to create an ABC using

JstorCollectionsAPI

Helper class that provides a standard way to create an ABC using

Functions

create_gmail_service()

Build the gmail service.

make_jstor_api(entity_type, entity_id)

Create the Jstor API Instance.

class oaebu_workflows.jstor_telescope.jstor_telescope.JstorRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date, reports)[source]

Bases: observatory.platform.workflows.workflow.PartitionRelease

Construct a JstorRelease.

Parameters:
  • dag_id (str) – The ID of the DAG

  • run_id (str) – The Airflow run ID

  • data_interval_start (pendulum.DateTime) – The beginning of the data interval

  • data_interval_end (pendulum.DateTime) – The end of the data interval

  • partition_date (pendulum.DateTime) – the partition date, corresponds to the last day of the month being processed.

  • reports (List[dict]) – list with report_type (country or institution) and url of reports

class oaebu_workflows.jstor_telescope.jstor_telescope.JstorTelescope(dag_id, cloud_workspace, entity_id, entity_type='publisher', country_partner='jstor_country', institution_partner='jstor_institution', bq_dataset_description='Data from JSTOR sources', bq_country_table_description=None, bq_institution_table_description=None, api_dataset_id='jstor', gmail_api_conn_id='gmail_api', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, catchup=False, max_active_runs=1, schedule='0 0 4 * *', start_date=pendulum.datetime(2016, 10, 1))[source]

Bases: observatory.platform.workflows.workflow.Workflow

The JSTOR telescope.

Construct a JstorTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param entity_id: The ID of the publisher for this DAG :param entity_type: Whether this entity should be treated as a publisher or a collection :param country_partner: The name of the country partner :param institution_partner: The name of the institution partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_country_table_description: Description for the BigQuery JSTOR country table :param bq_institution_table_description: Description for the BigQuery JSTOR institution table :param api_dataset_id: The ID to store the dataset release in the API :param gmail_api_conn_id: Airflow connection ID for the Gmail API :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param catchup: Whether to catchup the DAG or not :param max_active_runs: The maximum number of DAG runs that can be run concurrently :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG

Parameters:
  • dag_id (str) –

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –

  • entity_id (str) –

  • entity_type (Literal[publisher, collection]) –

  • country_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • institution_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • bq_dataset_description (str) –

  • bq_country_table_description (Optional[str]) –

  • bq_institution_table_description (Optional[str]) –

  • api_dataset_id (str) –

  • gmail_api_conn_id (str) –

  • observatory_api_conn_id (str) –

  • catchup (bool) –

  • max_active_runs (int) –

  • schedule (str) –

  • start_date (pendulum.DateTime) –

REPORTS_INFO = 'reports'[source]
PROCESSED_LABEL_NAME = 'processed_report'[source]
MAX_ATTEMPTS = 3[source]
FIXED_WAIT = 20[source]
MAX_WAIT_TIME[source]
EXP_BASE = 3[source]
MULTIPLIER = 10[source]
WAIT_FN[source]
make_release(**kwargs)[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.

Parameters:

kwargs – the context passed from the PythonOperator.

Return type:

List[JstorRelease]

See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list of grid release instances

list_reports(**kwargs)[source]

Lists all Jstor releases for a given month and publishes their report_type, download_url and release_date’s as an XCom.

Returns:

Whether to continue the DAG

Return type:

bool

download_reports(**kwargs)[source]

Download the JSTOR reports based on the list with available reports. The release date for each report is only known after downloading the report. Therefore they are first downloaded to a temporary location, afterwards the release info can be pushed as an xcom and the report is moved to the correct location.

Returns:

Whether to continue the DAG (always True)

Return type:

bool

upload_downloaded(releases, **kwargs)[source]

Uploads the downloaded files to GCS for each release

Parameters:

releases (List[JstorRelease]) – List of JstorRelease instances:

Return type:

None

transform(releases, **kwargs)[source]

Task to transform the Jstor releases for a given month.

Parameters:

releases (List[JstorRelease]) –

upload_transformed(releases, **kwargs)[source]

Uploads the transformed files to GCS for each release

Parameters:

releases (List[JstorRelease]) –

Return type:

None

bq_load(releases, **kwargs)[source]

Loads the sales and traffic data into BigQuery

Parameters:

releases (List[JstorRelease]) –

Return type:

None

add_new_dataset_releases(releases, **kwargs)[source]

Adds release information to API.

Parameters:

releases (List[JstorRelease]) –

Return type:

None

cleanup(releases, **kwargs)[source]

Delete all files, folders and XComs associated with this release. Assign a label to the gmail messages that have been processed.

Parameters:

releases (List[JstorRelease]) –

Return type:

None

oaebu_workflows.jstor_telescope.jstor_telescope.create_gmail_service()[source]

Build the gmail service.

Returns:

Gmail service instance

Return type:

googleapiclient.discovery.Resource

oaebu_workflows.jstor_telescope.jstor_telescope.make_jstor_api(entity_type, entity_id)[source]

Create the Jstor API Instance.

Parameters:
  • entity_type (Literal[publisher, collection]) – The entity type. Should be either ‘publisher’ or ‘collection’.

  • entity_id (str) – The entity id.

Returns:

The Jstor API instance

class oaebu_workflows.jstor_telescope.jstor_telescope.JstorAPI(service, entity_id)[source]

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

Parameters:
  • service (Any) –

  • entity_id (str) –

get_messages(list_params)[source]

Get messages from the Gmail API.

Parameters:

list_params (dict) – The parameters that will be passed to the Gmail API.

Return type:

List[dict]

get_label_id(label_name)[source]

Get the id of a label based on the label name.

Parameters:

label_name (str) – The name of the label

Returns:

The label id

Return type:

str

abstract list_reports()[source]
Return type:

List[dict]

abstract get_release_date(report_path)[source]
Parameters:

report_path (str) –

Return type:

tuple[pendulum.DateTime, pendulum.DateTime]

abstract download_report(report, download_path)[source]
Parameters:
  • report (dict) –

  • download_path (str) –

Return type:

None

abstract transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]
Parameters:
  • download_country (str) –

  • download_institution (str) –

  • transform_country (str) –

  • transform_institution (str) –

  • partition_date (pendulum.DateTime) –

Return type:

None

abstract add_labels(reports)[source]
Parameters:

reports (List[dict]) –

Return type:

bool

class oaebu_workflows.jstor_telescope.jstor_telescope.JstorPublishersAPI(service, entity_id)[source]

Bases: JstorAPI

Helper class that provides a standard way to create an ABC using inheritance.

Parameters:
  • service (Any) –

  • entity_id (str) –

list_reports()[source]

List the available releases by going through the messages of a gmail account and looking for a specific pattern.

If a message has been processed previously it has a specific label, messages with this label will be skipped. The message should include a download url. The head of this download url contains the filename, from which the release date and publisher can be derived.

Returns:

A list if dictionaries representing the messages with reports. Each has keys “type”, “url” and “id”.

Return type:

List[dict]

download_report(report, download_path)[source]

Download report from url to a file.

Parameters:
  • report (dict) – The report info. Should contain the “url” key

  • download_path (str) – Path to download data to

Return type:

None

get_header_info(url)[source]

Get header info from url and parse for filename and extension of file.

Parameters:

url (str) – Download url

Returns:

Filename and file extension

Return type:

Tuple[str, str]

get_release_date(report_path)[source]

Get the release date from the “Reporting_Period” part of the header. Also checks if the reports contains data from exactly one month.

Parameters:

report_path (str) – The path to the JSTOR report

Returns:

The start and end of the release month

Return type:

Tuple[pendulum.DateTime, pendulum.DateTime]

get_release_date_deprecated(report_path)[source]

This function is deprecated, because the headers for the reports have changed since 2021-10-01. It might still be used for reports that were created before this date and have not been processed yet. Get the release date from the “Usage Month” column in the first row of the report. Also checks if the reports contains data from the same month only.

Parameters:

report_path (str) – The path to the JSTOR report

Returns:

The start and end dates

Return type:

Tuple[pendulum.DateTime, pendulum.DateTime]

transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]

Transform a Jstor release into json lines format and gzip the result._summary_

Parameters:
  • download_country (str) – The path to the country download report

  • download_institution (str) – The path to the institution download report

  • transform_country (str) – The path to write the transformed country file to

  • transform_institution (str) – The path to write the transformed institution file to

  • partition_date (pendulum.DateTime) – The partition/release date of this report

Return type:

None

add_labels(reports)[source]

Adds the label name to all messages in the report

Parameters:

reports (List[dict]) – List of report info

Returns:

True if successful, False otherwise

Return type:

bool

class oaebu_workflows.jstor_telescope.jstor_telescope.JstorCollectionsAPI(service, entity_id)[source]

Bases: JstorAPI

Helper class that provides a standard way to create an ABC using inheritance.

Parameters:
  • service (Any) –

  • entity_id (str) –

list_reports()[source]

List the available reports by going through the gmail messages from a specific sender.

Returns:

A list if dictionaries representing the messages with reports as attachments. Each has keys “type”, “attachment_id” and “id”.

Return type:

List[dict]

download_report(report, download_path)[source]

Download report from url to a file.

Parameters:
  • report (dict) – The report info. Should contain the “id” and “attachement_id” keys

  • download_path (str) – Path to download data to

Return type:

None

get_release_date(report_path)[source]

Get the release date from the report. This should be under the “Month, Year of monthdt” column Also checks if the reports contains data from exactly one month.

Parameters:

report_path (str) – The path to the JSTOR report

Returns:

The start and end of the release month

Return type:

Tuple[pendulum.DateTime, pendulum.DateTime]

transform_reports(download_country, download_institution, transform_country, transform_institution, partition_date)[source]

Transform a Jstor release into json lines format and gzip the result._summary_

Parameters:
  • download_country (str) – The path to the country download report

  • download_institution (str) – The path to the institution download report

  • transform_country (str) – The path to write the transformed country file to

  • transform_institution (str) – The path to write the transformed institution file to

  • partition_date (pendulum.DateTime) – The partition/release date of this report

Return type:

None

add_labels(reports)[source]

Adds the label name to all messages in the report

Parameters:

reports (List[dict]) – List of report info

Returns:

True if successful, False otherwise

Return type:

bool