oaebu_workflows.onix_workflow.onix_workflow
Module Contents
Classes
Release information for OnixWorkflow |
|
Onix Workflow Instance |
Functions
|
Queries a metadata table to retrieve the unique DOIs. Provided the DOIs are not in a nested structure. |
|
Spawns multiple threads to download event data (DOI and publisher only) for each doi supplied. |
|
Downloads all crossref events from a url, iterating through pages if there is more than one |
|
Download crossref events from a single page |
"Task to throttle the calls to the crossref events API |
|
|
Spawns workers to transforms crossref events |
|
Transform the dictionary with event data by replacing '-' with '_' in key names, converting all int values to |
|
Creates copies of all sharded tables from a dataset with a matching a date string. |
|
Fetch the latest onix snapshot from BigQuery. |
Load the ISBN utils sql functions. |
|
|
Creates a jinja2 environment for any number of data partners |
|
Inserts a given field into a schema. |
Attributes
- oaebu_workflows.onix_workflow.onix_workflow.CROSSREF_EVENT_URL_TEMPLATE = 'https://api.eventdata.crossref.org/v1/events?mailto={mailto}&from-collected-date={start_date}&unt...'[source]
- class oaebu_workflows.onix_workflow.onix_workflow.OnixWorkflowRelease(*, dag_id, run_id, snapshot_date, onix_snapshot_date, crossref_master_snapshot_date)[source]
Bases:
observatory.platform.workflows.workflow.SnapshotRelease
Release information for OnixWorkflow
Construct the OnixWorkflow Release :param dag_id: DAG ID. :param release_date: The date of the partition/release :param onix_snapshot_date: The ONIX snapshot/release date. :param crossref_master_snapshot_date: The release date/suffix of the crossref master table
- Parameters:
dag_id (str) –
run_id (str) –
snapshot_date (pendulum.DateTime) –
onix_snapshot_date (pendulum.DateTime) –
crossref_master_snapshot_date (pendulum.DateTime) –
- class oaebu_workflows.onix_workflow.onix_workflow.OnixWorkflow(dag_id, cloud_workspace, metadata_partner, bq_master_crossref_project_id='academic-observatory', bq_master_crossref_dataset_id='crossref_metadata', bq_oaebu_crossref_dataset_id='crossref', bq_master_crossref_metadata_table_name='crossref_metadata', bq_oaebu_crossref_metadata_table_name='crossref_metadata', bq_crossref_events_table_name='crossref_events', bq_country_project_id='oaebu-public-data', bq_country_dataset_id='oaebu_reference', bq_subject_project_id='oaebu-public-data', bq_subject_dataset_id='oaebu_reference', bq_book_table_name='book', bq_book_product_table_name='book_product', bq_onix_workflow_dataset='onix_workflow', bq_oaebu_intermediate_dataset='oaebu_intermediate', bq_oaebu_dataset='oaebu', bq_oaebu_export_dataset='data_export', bq_oaebu_latest_export_dataset='data_export_latest', bq_worksid_table_name='onix_workid_isbn', bq_worksid_error_table_name='onix_workid_isbn_errors', bq_workfamilyid_table_name='onix_workfamilyid_isbn', bq_dataset_description='ONIX workflow tables', oaebu_intermediate_match_suffix='_matched', data_partners=None, ga3_views_field='page_views', schema_folder=default_schema_folder(workflow_module='onix_workflow'), mailto='agent@observatory.academy', crossref_start_date=pendulum.datetime(2018, 5, 14), api_dataset_id='onix_workflow', max_threads=2 * os.cpu_count() - 1, observatory_api_conn_id=AirflowConns.OBSERVATORY_API, sensor_dag_ids=None, catchup=False, start_date=pendulum.datetime(2022, 8, 1), schedule='@weekly')[source]
Bases:
observatory.platform.workflows.workflow.Workflow
Onix Workflow Instance
Initialises the workflow object.
- Parameters:
dag_id (str) – DAG ID.
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) – The CloudWorkspace object for this DAG
bq_master_crossref_project_id (str) – GCP project ID of crossref master data
bq_master_crossref_dataset_id (str) – GCP dataset ID of crossref master data
bq_oaebu_crossref_dataset_id (str) – GCP dataset ID of crossref OAeBU data
bq_master_crossref_metadata_table_name (str) – The name of the master crossref metadata table
bq_oaebu_crossref_metadata_table_name (str) – The name of the OAeBU crossref metadata table
bq_crossref_events_table_name (str) – The name of the crossref events table
bq_country_project_id (str) – GCP project ID of the country table
bq_country_dataset_id (str) – GCP dataset containing the country table
bq_subject_project_id (str) – GCP project ID of the subject tables
bq_subject_dataset_id (str) – GCP dataset ID of the subject tables
bq_book_table_name (str) – The name of the book table
bq_book_product_table_name (str) – The name of the book product table
bq_onix_workflow_dataset (str) – Onix workflow dataset.
bq_oaebu_intermediate_dataset (str) – OAEBU intermediate dataset.
bq_oaebu_dataset (str) – OAEBU dataset.
bq_oaebu_export_dataset (str) – OAEBU data export dataset.
bq_oaebu_latest_export_dataset (str) – OAEBU data export dataset with the latest export tables
bq_worksid_table_name (str) – table ID of the worksid table
bq_worksid_error_table_name (str) – table ID of the worksid error table
bq_workfamilyid_table_name (str) – table ID of the workfamilyid table
bq_dataset_description (str) – Description to give to the workflow tables
oaebu_intermediate_match_suffix (str) – Suffix to append to intermediate tables
data_partners (List[Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]]) – OAEBU data sources.
ga3_views_field – The name of the GA3 views field - should be either ‘page_views’ or ‘unique_views’
schema_folder (str) – the SQL schema path.
mailto (str) – email address used to identify the user when sending requests to an API.
crossref_start_date (pendulum.DateTime) – The starting date of crossref’s API calls
api_dataset_id (str) – The ID to store the dataset release in the API
max_threads (int) – The maximum number of threads to use for parallel tasks.
observatory_api_conn_id (str) – The connection ID for the observatory API
sensor_dag_ids (List[str]) – Dag IDs for dependent tasks
catchup (Optional[bool]) – Whether to catch up missed DAG runs.
start_date (Optional[pendulum.DateTime]) – Start date of the DAG.
schedule (Optional[str]) – Scheduled interval for running the DAG.
metadata_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
- create_tasks_export_tables()[source]
Create tasks for exporting final metrics from our OAEBU data. These are split into two categories: generic and custom. The custom exports change their schema depending on the data partners.
- make_release(**kwargs)[source]
Creates a release object.
- Parameters:
kwargs – From Airflow. Contains the execution_date.
- Returns:
an OnixWorkflowRelease object.
- Return type:
- aggregate_works(release, **kwargs)[source]
Fetches the ONIX product records from our ONIX database, aggregates them into works, workfamilies, and outputs it into jsonl files.
- Parameters:
release (OnixWorkflowRelease) – The onix workflow release object
- Return type:
None
- create_crossref_metadata_table(release, **kwargs)[source]
Creates the crossref metadata table by querying the AO master table and matching on this publisher’s ISBNs
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- create_crossref_events_table(release, **kwargs)[source]
Download, transform, upload and create a table for crossref events
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- create_book_table(release, **kwargs)[source]
Create the oaebu book table using the crossref event and metadata tables
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- create_intermediate_table(release, *, orig_project_id, orig_dataset, orig_table, orig_isbn, sharded, **kwargs)[source]
Create an intermediate oaebu table. They are of the form datasource_matched<date>
- Parameters:
release (OnixWorkflowRelease) – Onix workflow release information.
orig_project_id (str) – Project ID for the partner data.
orig_dataset (str) – Dataset ID for the partner data.
orig_table (str) – Table ID for the partner data.
orig_isbn (str) – Name of the ISBN field in the partner data table.
sharded (bool) – Whether the data partner table is sharded
- Return type:
None
- create_book_product_table(release, **kwargs)[source]
Create the Book Product Table
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- export_oaebu_table(release, **kwargs)[source]
Create an export table.
Takes several kwargs: :param output_table: The name of the table to create :param query_template: The name of the template SQL file :param schema_file_path: The path to the schema :return: Whether the table creation was a success
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
bool
- export_book_metrics_country(release, **kwargs)[source]
Create table for country metrics
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- export_book_metrics_author(release, **kwargs)[source]
Create table for author metrics
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- export_book_metrics(release, **kwargs)[source]
Create table for book metrics
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- export_book_metrics_subjects(release, **kwargs)[source]
Create tables for subject metrics
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- update_latest_export_tables(release, **kwargs)[source]
Create copies of the latest data export tables in bigquery
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- add_new_dataset_releases(release, **kwargs)[source]
Adds release information to API.
- Parameters:
release (OnixWorkflowRelease) –
- Return type:
None
- cleanup(release, **kwargs)[source]
Cleanup temporary files.
- Parameters:
release (OnixWorkflowRelease) –
- oaebu_workflows.onix_workflow.onix_workflow.dois_from_table(table_id, doi_column_name='DOI', distinct=True)[source]
Queries a metadata table to retrieve the unique DOIs. Provided the DOIs are not in a nested structure.
- Parameters:
metadata_table_id – The fully qualified ID of the metadata table on GCP
doi_field_name – The name of the DOI column
distinct (str) – Whether to retrieve only unique DOIs
table_id (str) –
doi_column_name (str) –
- Returns:
All DOIs present in the metadata table
- Return type:
List[str]
- oaebu_workflows.onix_workflow.onix_workflow.download_crossref_events(dois, start_date, end_date, mailto, max_threads=1)[source]
Spawns multiple threads to download event data (DOI and publisher only) for each doi supplied. The url template was made with reference to the crossref event api: https://www.eventdata.crossref.org/guide/service/query-api/ Note that the max_threads will cap at 15 because the events API will return a 429 if more than 15 requests are made per second. Each API request happens to take roughly 1 second. Having more threadsthan necessary slows down the download process as the retry script will wait a minimum of two seconds between each attempt.
- Parameters:
dois (List[str]) – The list of DOIs to download the events for
start_date (pendulum.DateTime) – The start date for events we’re interested in
end_date (pendulum.DateTime) – The end date for events we’re interested in
mailto (str) – The email to use as a reference for who is requesting the data
max_threads (int) – The maximum threads to spawn for the downloads.
- Returns:
All events for the input DOIs
- Return type:
List[dict]
- oaebu_workflows.onix_workflow.onix_workflow.download_crossref_event_url(url, i=0)[source]
Downloads all crossref events from a url, iterating through pages if there is more than one
- Parameters:
url (str) – The url send the request to
i (int) – Worker number
- Returns:
The events from this URL
- Return type:
List[dict]
- oaebu_workflows.onix_workflow.onix_workflow.download_crossref_page_events(url, headers)[source]
Download crossref events from a single page
- Parameters:
url (str) – The url to send the request to
headers (dict) – Headers to send with the request
- Returns:
The cursor, event counter, total number of events and the events for the URL
- Return type:
Tuple[str, int, int, List[dict]]
- oaebu_workflows.onix_workflow.onix_workflow.crossref_events_limiter()[source]
“Task to throttle the calls to the crossref events API
- oaebu_workflows.onix_workflow.onix_workflow.transform_crossref_events(events, max_threads=1)[source]
Spawns workers to transforms crossref events
- Parameters:
all_events – A list of the events to transform
max_threads (int) – The maximum number of threads to utilise for the transforming process
events (List[dict]) –
- Returns:
transformed events, the order of the events in the input list is not preserved
- Return type:
List[dict]
- oaebu_workflows.onix_workflow.onix_workflow.transform_event(event)[source]
Transform the dictionary with event data by replacing ‘-’ with ‘_’ in key names, converting all int values to string except for the ‘total’ field and parsing datetime columns for a valid datetime.
- Parameters:
event (dict) – The event dictionary
- Returns:
The transformed event dictionary
- Return type:
dict
- oaebu_workflows.onix_workflow.onix_workflow.copy_latest_export_tables(project_id, from_dataset, to_dataset, date_match, data_location, description=None)[source]
Creates copies of all sharded tables from a dataset with a matching a date string.
- Parameters:
project_id (str) – The project id
from_dataset (str) – The dataset containing the sharded tables
to_dataset (str) – The dataset to contain the copied tables - will create if does not exist
date_match (str) – The date string to match. e.g. for a table named ‘this_table20220101’, this would be ‘20220101’
data_location (str) – The regional location of the data in google cloud
description (str) – The description for dataset housing the copied tables
- Return type:
None
- oaebu_workflows.onix_workflow.onix_workflow.get_onix_records(table_id)[source]
Fetch the latest onix snapshot from BigQuery. :param table_id: Fully qualified table ID. :return: List of onix product records.
- Parameters:
table_id (str) –
- Return type:
List[dict]
- oaebu_workflows.onix_workflow.onix_workflow.get_isbn_utils_sql_string()[source]
Load the ISBN utils sql functions. :return BQ SQL string.
- Return type:
str
- oaebu_workflows.onix_workflow.onix_workflow.create_data_partner_env(main_template, data_partners)[source]
Creates a jinja2 environment for any number of data partners
- Parameters:
main_template (str) – The name of the main jinja2 template
data_partners (Iterable[oaebu_workflows.oaebu_partners.DataPartner]) – The data partners
- Returns:
Jinja2 environment with data partners sql folders loaded
- Return type:
jinja2.Environment
- oaebu_workflows.onix_workflow.onix_workflow.insert_into_schema(schema_base, insert_field, schema_field_name=None)[source]
Inserts a given field into a schema.
- Parameters:
schema_base (List[dict]) – (List[dict]): The base schema to insert the field into.
insert_field (dict) – (dict): The field to be inserted into the schema.
schema_field_name (Optional[str]) – (Optional[str], optional): The name of the field in the schema. If provided, the field will be inserted into the matching field. If not provided, the field will be appended to the end of the schema.
- Returns:
The updated schema with the field inserted.
Raises ValueError If the provided schema_field_name is not found in the schema.