oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope
Module Contents
Classes
Construct a UclDiscoveryRelease instance. |
|
The UCL Discovery telescope. |
Functions
|
Get the eprint id to isbn mapping from the google sheet |
|
Downloads the discovery stats for a given eprint ID within a specified date range. |
|
Transforms the discovery stats for a single set of records |
- class oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.UclDiscoveryRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date)[source]
Bases:
observatory.platform.workflows.workflow.PartitionRelease
Construct a UclDiscoveryRelease instance.
- Parameters:
dag_id (str) – The ID of the DAG
run_id (str) – The Airflow run ID.
data_interval_start (pendulum.DateTime) – The start of the data interval.
data_interval_end (pendulum.DateTime) – The end of the data interval.
partition_date (pendulum.DateTime) – The partition date for this release.
- class oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.UclDiscoveryTelescope(dag_id, cloud_workspace, sheet_id, data_partner='ucl_discovery', bq_dataset_description='UCL Discovery dataset', bq_table_description='UCL Discovery table', api_dataset_id='ucl', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, oaebu_service_account_conn_id='oaebu_service_account', max_threads=os.cpu_count() * 2, schedule='0 0 4 * *', start_date=pendulum.datetime(2015, 6, 1), catchup=True, max_active_runs=10)[source]
Bases:
observatory.platform.workflows.workflow.Workflow
The UCL Discovery telescope.
Construct a UclDiscoveryTelescope instance.
- Parameters:
dag_id (str) – The ID of the DAG
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) – The CloudWorkspace object for this DAG
sheet_id (str) – The ID of the google sheet match eprint ID to ISBN13
data_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) – The name of the data partner
bq_dataset_description (str) – Description for the BigQuery dataset
bq_table_description (str) – Description for the biguery table
api_dataset_id (str) – The ID to store the dataset release in the API
observatory_api_conn_id (str) – Airflow connection ID for the overvatory API
oaebu_service_account_conn_id (str) – Airflow connection ID for the oaebu service account
max_threads (int) – The maximum number threads to utilise for parallel processes
schedule (str) – The schedule interval of the DAG
start_date (pendulum.DateTime) – The start date of the DAG
catchup (bool) – Whether to catchup the DAG or not
max_active_runs (int) – The maximum number of concurrent DAG runs
- make_release(**kwargs)[source]
Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’. There will only be 1 release, but it is passed on as a list so the SnapshotTelescope template methods can be used.
- Parameters:
kwargs – the context passed from the PythonOperator.
- Return type:
List[UclDiscoveryRelease]
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list with one ucldiscovery release instance.
- download(release, **kwargs)[source]
Fownload the ucl discovery data for a given release. :param releases: The UCL discovery release.
- Parameters:
release (UclDiscoveryRelease) –
- upload_downloaded(release, **kwargs)[source]
Uploads the downloaded files to GCS
- Parameters:
release (UclDiscoveryRelease) –
- transform(release, **kwargs)[source]
Transform the ucl discovery data for a given release.
- Parameters:
release (UclDiscoveryRelease) –
- upload_transformed(release, **kwargs)[source]
Uploads the transformed file to GCS
- Parameters:
release (UclDiscoveryRelease) –
- bq_load(release, **kwargs)[source]
Loads the transformed data into BigQuery
- Parameters:
release (UclDiscoveryRelease) –
- Return type:
None
- add_new_dataset_releases(release, **kwargs)[source]
Adds release information to API.
- Parameters:
release (UclDiscoveryRelease) –
- Return type:
None
- cleanup(release, **kwargs)[source]
Delete all files, folders and XComs associated with this release.
- Parameters:
release (UclDiscoveryRelease) –
- Return type:
None
- oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.get_isbn_eprint_mappings(sheet_id, service_account_conn_id, cutoff_date)[source]
Get the eprint id to isbn mapping from the google sheet
- Parameters:
sheet_id (str) – The ID of the google sheet.
credentials – The credentials object to authenticate with.
cutoff_date (pendulum.DateTime) – The cutoff date. If an item is published after this date, it will be skipped.
service_account_conn_id (str) –
- Return type:
dict
- oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.download_discovery_stats(eprint_id, start_date, end_date)[source]
Downloads the discovery stats for a given eprint ID within a specified date range.
- Parameters:
eprint_id (str) – The eprint ID of the item to get the stats for.
start_date (pendulum.DateTime) – The start date of the date range.
end_date (pendulum.DateTime) – The end date of the date range.
- Returns:
A tuple containing the country statistics and the total downloads statistics.
- oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.transform_discovery_stats(country_record, totals_record, isbn, title)[source]
Transforms the discovery stats for a single set of records
- Parameters:
country_record (dict) – The country record
totals_record (dict) – The totals record
isbn (str) – The isbn that matches the eprint id
title (str) –
- Returns:
The transformed stats
- Return type:
dict