oaebu_workflows.onix_workflow.onix_workflow

Module Contents

Classes

OnixWorkflowRelease

Release information for OnixWorkflow

OnixWorkflow

Onix Workflow Instance

Functions

dois_from_table(table_id[, doi_column_name, distinct])

Queries a metadata table to retrieve the unique DOIs. Provided the DOIs are not in a nested structure.

download_crossref_events(dois, start_date, end_date, ...)

Spawns multiple threads to download event data (DOI and publisher only) for each doi supplied.

download_crossref_event_url(url[, i])

Downloads all crossref events from a url, iterating through pages if there is more than one

download_crossref_page_events(url, headers)

Download crossref events from a single page

crossref_events_limiter()

"Task to throttle the calls to the crossref events API

transform_crossref_events(events[, max_threads])

Spawns workers to transforms crossref events

transform_event(event)

Transform the dictionary with event data by replacing '-' with '_' in key names, converting all int values to

copy_latest_export_tables(project_id, from_dataset, ...)

Creates copies of all sharded tables from a dataset with a matching a date string.

get_onix_records(table_id)

Fetch the latest onix snapshot from BigQuery.

get_isbn_utils_sql_string()

Load the ISBN utils sql functions.

create_data_partner_env(main_template, data_partners)

Creates a jinja2 environment for any number of data partners

insert_into_schema(schema_base, insert_field[, ...])

Inserts a given field into a schema.

Attributes

CROSSREF_EVENT_URL_TEMPLATE

oaebu_workflows.onix_workflow.onix_workflow.CROSSREF_EVENT_URL_TEMPLATE = 'https://api.eventdata.crossref.org/v1/events?mailto={mailto}&from-collected-date={start_date}&unt...'[source]
class oaebu_workflows.onix_workflow.onix_workflow.OnixWorkflowRelease(*, dag_id, run_id, snapshot_date, onix_snapshot_date, crossref_master_snapshot_date)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

Release information for OnixWorkflow

Construct the OnixWorkflow Release :param dag_id: DAG ID. :param release_date: The date of the partition/release :param onix_snapshot_date: The ONIX snapshot/release date. :param crossref_master_snapshot_date: The release date/suffix of the crossref master table

Parameters:
  • dag_id (str) –

  • run_id (str) –

  • snapshot_date (pendulum.DateTime) –

  • onix_snapshot_date (pendulum.DateTime) –

  • crossref_master_snapshot_date (pendulum.DateTime) –

class oaebu_workflows.onix_workflow.onix_workflow.OnixWorkflow(dag_id, cloud_workspace, metadata_partner, bq_master_crossref_project_id='academic-observatory', bq_master_crossref_dataset_id='crossref_metadata', bq_oaebu_crossref_dataset_id='crossref', bq_master_crossref_metadata_table_name='crossref_metadata', bq_oaebu_crossref_metadata_table_name='crossref_metadata', bq_crossref_events_table_name='crossref_events', bq_country_project_id='oaebu-public-data', bq_country_dataset_id='oaebu_reference', bq_subject_project_id='oaebu-public-data', bq_subject_dataset_id='oaebu_reference', bq_book_table_name='book', bq_book_product_table_name='book_product', bq_onix_workflow_dataset='onix_workflow', bq_oaebu_intermediate_dataset='oaebu_intermediate', bq_oaebu_dataset='oaebu', bq_oaebu_export_dataset='data_export', bq_oaebu_latest_export_dataset='data_export_latest', bq_worksid_table_name='onix_workid_isbn', bq_worksid_error_table_name='onix_workid_isbn_errors', bq_workfamilyid_table_name='onix_workfamilyid_isbn', bq_dataset_description='ONIX workflow tables', oaebu_intermediate_match_suffix='_matched', data_partners=None, ga3_views_field='page_views', schema_folder=default_schema_folder(workflow_module='onix_workflow'), mailto='agent@observatory.academy', crossref_start_date=pendulum.datetime(2018, 5, 14), api_dataset_id='onix_workflow', max_threads=2 * os.cpu_count() - 1, observatory_api_conn_id=AirflowConns.OBSERVATORY_API, sensor_dag_ids=None, catchup=False, start_date=pendulum.datetime(2022, 8, 1), schedule='@weekly')[source]

Bases: observatory.platform.workflows.workflow.Workflow

Onix Workflow Instance

Initialises the workflow object.

Parameters:
  • dag_id (str) – DAG ID.

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) – The CloudWorkspace object for this DAG

  • bq_master_crossref_project_id (str) – GCP project ID of crossref master data

  • bq_master_crossref_dataset_id (str) – GCP dataset ID of crossref master data

  • bq_oaebu_crossref_dataset_id (str) – GCP dataset ID of crossref OAeBU data

  • bq_master_crossref_metadata_table_name (str) – The name of the master crossref metadata table

  • bq_oaebu_crossref_metadata_table_name (str) – The name of the OAeBU crossref metadata table

  • bq_crossref_events_table_name (str) – The name of the crossref events table

  • bq_country_project_id (str) – GCP project ID of the country table

  • bq_country_dataset_id (str) – GCP dataset containing the country table

  • bq_subject_project_id (str) – GCP project ID of the subject tables

  • bq_subject_dataset_id (str) – GCP dataset ID of the subject tables

  • bq_book_table_name (str) – The name of the book table

  • bq_book_product_table_name (str) – The name of the book product table

  • bq_onix_workflow_dataset (str) – Onix workflow dataset.

  • bq_oaebu_intermediate_dataset (str) – OAEBU intermediate dataset.

  • bq_oaebu_dataset (str) – OAEBU dataset.

  • bq_oaebu_export_dataset (str) – OAEBU data export dataset.

  • bq_oaebu_latest_export_dataset (str) – OAEBU data export dataset with the latest export tables

  • bq_worksid_table_name (str) – table ID of the worksid table

  • bq_worksid_error_table_name (str) – table ID of the worksid error table

  • bq_workfamilyid_table_name (str) – table ID of the workfamilyid table

  • bq_dataset_description (str) – Description to give to the workflow tables

  • oaebu_intermediate_match_suffix (str) – Suffix to append to intermediate tables

  • data_partners (List[Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]]) – OAEBU data sources.

  • ga3_views_field – The name of the GA3 views field - should be either ‘page_views’ or ‘unique_views’

  • schema_folder (str) – the SQL schema path.

  • mailto (str) – email address used to identify the user when sending requests to an API.

  • crossref_start_date (pendulum.DateTime) – The starting date of crossref’s API calls

  • api_dataset_id (str) – The ID to store the dataset release in the API

  • max_threads (int) – The maximum number of threads to use for parallel tasks.

  • observatory_api_conn_id (str) – The connection ID for the observatory API

  • sensor_dag_ids (List[str]) – Dag IDs for dependent tasks

  • catchup (Optional[bool]) – Whether to catch up missed DAG runs.

  • start_date (Optional[pendulum.DateTime]) – Start date of the DAG.

  • schedule (Optional[str]) – Scheduled interval for running the DAG.

  • metadata_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

make_dag()[source]

Construct the DAG

Return type:

airflow.DAG

create_tasks_export_tables()[source]

Create tasks for exporting final metrics from our OAEBU data. These are split into two categories: generic and custom. The custom exports change their schema depending on the data partners.

make_release(**kwargs)[source]

Creates a release object.

Parameters:

kwargs – From Airflow. Contains the execution_date.

Returns:

an OnixWorkflowRelease object.

Return type:

OnixWorkflowRelease

aggregate_works(release, **kwargs)[source]

Fetches the ONIX product records from our ONIX database, aggregates them into works, workfamilies, and outputs it into jsonl files.

Parameters:

release (OnixWorkflowRelease) – The onix workflow release object

Return type:

None

create_crossref_metadata_table(release, **kwargs)[source]

Creates the crossref metadata table by querying the AO master table and matching on this publisher’s ISBNs

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

create_crossref_events_table(release, **kwargs)[source]

Download, transform, upload and create a table for crossref events

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

create_book_table(release, **kwargs)[source]

Create the oaebu book table using the crossref event and metadata tables

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

create_intermediate_table(release, *, orig_project_id, orig_dataset, orig_table, orig_isbn, sharded, **kwargs)[source]

Create an intermediate oaebu table. They are of the form datasource_matched<date>

Parameters:
  • release (OnixWorkflowRelease) – Onix workflow release information.

  • orig_project_id (str) – Project ID for the partner data.

  • orig_dataset (str) – Dataset ID for the partner data.

  • orig_table (str) – Table ID for the partner data.

  • orig_isbn (str) – Name of the ISBN field in the partner data table.

  • sharded (bool) – Whether the data partner table is sharded

Return type:

None

create_book_product_table(release, **kwargs)[source]

Create the Book Product Table

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

export_oaebu_table(release, **kwargs)[source]

Create an export table.

Takes several kwargs: :param output_table: The name of the table to create :param query_template: The name of the template SQL file :param schema_file_path: The path to the schema :return: Whether the table creation was a success

Parameters:

release (OnixWorkflowRelease) –

Return type:

bool

export_book_metrics_country(release, **kwargs)[source]

Create table for country metrics

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

export_book_metrics_author(release, **kwargs)[source]

Create table for author metrics

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

export_book_metrics(release, **kwargs)[source]

Create table for book metrics

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

export_book_metrics_subjects(release, **kwargs)[source]

Create tables for subject metrics

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

update_latest_export_tables(release, **kwargs)[source]

Create copies of the latest data export tables in bigquery

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

add_new_dataset_releases(release, **kwargs)[source]

Adds release information to API.

Parameters:

release (OnixWorkflowRelease) –

Return type:

None

cleanup(release, **kwargs)[source]

Cleanup temporary files.

Parameters:

release (OnixWorkflowRelease) –

oaebu_workflows.onix_workflow.onix_workflow.dois_from_table(table_id, doi_column_name='DOI', distinct=True)[source]

Queries a metadata table to retrieve the unique DOIs. Provided the DOIs are not in a nested structure.

Parameters:
  • metadata_table_id – The fully qualified ID of the metadata table on GCP

  • doi_field_name – The name of the DOI column

  • distinct (str) – Whether to retrieve only unique DOIs

  • table_id (str) –

  • doi_column_name (str) –

Returns:

All DOIs present in the metadata table

Return type:

List[str]

oaebu_workflows.onix_workflow.onix_workflow.download_crossref_events(dois, start_date, end_date, mailto, max_threads=1)[source]

Spawns multiple threads to download event data (DOI and publisher only) for each doi supplied. The url template was made with reference to the crossref event api: https://www.eventdata.crossref.org/guide/service/query-api/ Note that the max_threads will cap at 15 because the events API will return a 429 if more than 15 requests are made per second. Each API request happens to take roughly 1 second. Having more threadsthan necessary slows down the download process as the retry script will wait a minimum of two seconds between each attempt.

Parameters:
  • dois (List[str]) – The list of DOIs to download the events for

  • start_date (pendulum.DateTime) – The start date for events we’re interested in

  • end_date (pendulum.DateTime) – The end date for events we’re interested in

  • mailto (str) – The email to use as a reference for who is requesting the data

  • max_threads (int) – The maximum threads to spawn for the downloads.

Returns:

All events for the input DOIs

Return type:

List[dict]

oaebu_workflows.onix_workflow.onix_workflow.download_crossref_event_url(url, i=0)[source]

Downloads all crossref events from a url, iterating through pages if there is more than one

Parameters:
  • url (str) – The url send the request to

  • i (int) – Worker number

Returns:

The events from this URL

Return type:

List[dict]

oaebu_workflows.onix_workflow.onix_workflow.download_crossref_page_events(url, headers)[source]

Download crossref events from a single page

Parameters:
  • url (str) – The url to send the request to

  • headers (dict) – Headers to send with the request

Returns:

The cursor, event counter, total number of events and the events for the URL

Return type:

Tuple[str, int, int, List[dict]]

oaebu_workflows.onix_workflow.onix_workflow.crossref_events_limiter()[source]

“Task to throttle the calls to the crossref events API

oaebu_workflows.onix_workflow.onix_workflow.transform_crossref_events(events, max_threads=1)[source]

Spawns workers to transforms crossref events

Parameters:
  • all_events – A list of the events to transform

  • max_threads (int) – The maximum number of threads to utilise for the transforming process

  • events (List[dict]) –

Returns:

transformed events, the order of the events in the input list is not preserved

Return type:

List[dict]

oaebu_workflows.onix_workflow.onix_workflow.transform_event(event)[source]

Transform the dictionary with event data by replacing ‘-’ with ‘_’ in key names, converting all int values to string except for the ‘total’ field and parsing datetime columns for a valid datetime.

Parameters:

event (dict) – The event dictionary

Returns:

The transformed event dictionary

Return type:

dict

oaebu_workflows.onix_workflow.onix_workflow.copy_latest_export_tables(project_id, from_dataset, to_dataset, date_match, data_location, description=None)[source]

Creates copies of all sharded tables from a dataset with a matching a date string.

Parameters:
  • project_id (str) – The project id

  • from_dataset (str) – The dataset containing the sharded tables

  • to_dataset (str) – The dataset to contain the copied tables - will create if does not exist

  • date_match (str) – The date string to match. e.g. for a table named ‘this_table20220101’, this would be ‘20220101’

  • data_location (str) – The regional location of the data in google cloud

  • description (str) – The description for dataset housing the copied tables

Return type:

None

oaebu_workflows.onix_workflow.onix_workflow.get_onix_records(table_id)[source]

Fetch the latest onix snapshot from BigQuery. :param table_id: Fully qualified table ID. :return: List of onix product records.

Parameters:

table_id (str) –

Return type:

List[dict]

oaebu_workflows.onix_workflow.onix_workflow.get_isbn_utils_sql_string()[source]

Load the ISBN utils sql functions. :return BQ SQL string.

Return type:

str

oaebu_workflows.onix_workflow.onix_workflow.create_data_partner_env(main_template, data_partners)[source]

Creates a jinja2 environment for any number of data partners

Parameters:
Returns:

Jinja2 environment with data partners sql folders loaded

Return type:

jinja2.Environment

oaebu_workflows.onix_workflow.onix_workflow.insert_into_schema(schema_base, insert_field, schema_field_name=None)[source]

Inserts a given field into a schema.

Parameters:
  • schema_base (List[dict]) – (List[dict]): The base schema to insert the field into.

  • insert_field (dict) – (dict): The field to be inserted into the schema.

  • schema_field_name (Optional[str]) – (Optional[str], optional): The name of the field in the schema. If provided, the field will be inserted into the matching field. If not provided, the field will be appended to the end of the schema.

Returns:

The updated schema with the field inserted.

Raises ValueError If the provided schema_field_name is not found in the schema.