oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope

Module Contents

Classes

UclDiscoveryRelease

Construct a UclDiscoveryRelease instance.

UclDiscoveryTelescope

The UCL Discovery telescope.

Functions

get_isbn_eprint_mappings(sheet_id, ...)

Get the eprint id to isbn mapping from the google sheet

download_discovery_stats(eprint_id, start_date, end_date)

Downloads the discovery stats for a given eprint ID within a specified date range.

transform_discovery_stats(country_record, ...)

Transforms the discovery stats for a single set of records

class oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.UclDiscoveryRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date)[source]

Bases: observatory.platform.workflows.workflow.PartitionRelease

Construct a UclDiscoveryRelease instance.

Parameters:
  • dag_id (str) – The ID of the DAG

  • run_id (str) – The Airflow run ID.

  • data_interval_start (pendulum.DateTime) – The start of the data interval.

  • data_interval_end (pendulum.DateTime) – The end of the data interval.

  • partition_date (pendulum.DateTime) – The partition date for this release.

class oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.UclDiscoveryTelescope(dag_id, cloud_workspace, sheet_id, data_partner='ucl_discovery', bq_dataset_description='UCL Discovery dataset', bq_table_description='UCL Discovery table', api_dataset_id='ucl', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, oaebu_service_account_conn_id='oaebu_service_account', max_threads=os.cpu_count() * 2, schedule='0 0 4 * *', start_date=pendulum.datetime(2015, 6, 1), catchup=True, max_active_runs=10)[source]

Bases: observatory.platform.workflows.workflow.Workflow

The UCL Discovery telescope.

Construct a UclDiscoveryTelescope instance.

Parameters:
  • dag_id (str) – The ID of the DAG

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) – The CloudWorkspace object for this DAG

  • sheet_id (str) – The ID of the google sheet match eprint ID to ISBN13

  • data_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) – The name of the data partner

  • bq_dataset_description (str) – Description for the BigQuery dataset

  • bq_table_description (str) – Description for the biguery table

  • api_dataset_id (str) – The ID to store the dataset release in the API

  • observatory_api_conn_id (str) – Airflow connection ID for the overvatory API

  • oaebu_service_account_conn_id (str) – Airflow connection ID for the oaebu service account

  • max_threads (int) – The maximum number threads to utilise for parallel processes

  • schedule (str) – The schedule interval of the DAG

  • start_date (pendulum.DateTime) – The start date of the DAG

  • catchup (bool) – Whether to catchup the DAG or not

  • max_active_runs (int) – The maximum number of concurrent DAG runs

make_release(**kwargs)[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’. There will only be 1 release, but it is passed on as a list so the SnapshotTelescope template methods can be used.

Parameters:

kwargs – the context passed from the PythonOperator.

Return type:

List[UclDiscoveryRelease]

See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list with one ucldiscovery release instance.

download(release, **kwargs)[source]

Fownload the ucl discovery data for a given release. :param releases: The UCL discovery release.

Parameters:

release (UclDiscoveryRelease) –

upload_downloaded(release, **kwargs)[source]

Uploads the downloaded files to GCS

Parameters:

release (UclDiscoveryRelease) –

transform(release, **kwargs)[source]

Transform the ucl discovery data for a given release.

Parameters:

release (UclDiscoveryRelease) –

upload_transformed(release, **kwargs)[source]

Uploads the transformed file to GCS

Parameters:

release (UclDiscoveryRelease) –

bq_load(release, **kwargs)[source]

Loads the transformed data into BigQuery

Parameters:

release (UclDiscoveryRelease) –

Return type:

None

add_new_dataset_releases(release, **kwargs)[source]

Adds release information to API.

Parameters:

release (UclDiscoveryRelease) –

Return type:

None

cleanup(release, **kwargs)[source]

Delete all files, folders and XComs associated with this release.

Parameters:

release (UclDiscoveryRelease) –

Return type:

None

oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.get_isbn_eprint_mappings(sheet_id, service_account_conn_id, cutoff_date)[source]

Get the eprint id to isbn mapping from the google sheet

Parameters:
  • sheet_id (str) – The ID of the google sheet.

  • credentials – The credentials object to authenticate with.

  • cutoff_date (pendulum.DateTime) – The cutoff date. If an item is published after this date, it will be skipped.

  • service_account_conn_id (str) –

Return type:

dict

oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.download_discovery_stats(eprint_id, start_date, end_date)[source]

Downloads the discovery stats for a given eprint ID within a specified date range.

Parameters:
  • eprint_id (str) – The eprint ID of the item to get the stats for.

  • start_date (pendulum.DateTime) – The start date of the date range.

  • end_date (pendulum.DateTime) – The end date of the date range.

Returns:

A tuple containing the country statistics and the total downloads statistics.

oaebu_workflows.ucl_discovery_telescope.ucl_discovery_telescope.transform_discovery_stats(country_record, totals_record, isbn, title)[source]

Transforms the discovery stats for a single set of records

Parameters:
  • country_record (dict) – The country record

  • totals_record (dict) – The totals record

  • isbn (str) – The isbn that matches the eprint id

  • title (str) –

Returns:

The transformed stats

Return type:

dict