oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope

Module Contents

Classes

IrusOapenRelease

Create a IrusOapenRelease instance.

IrusOapenTelescope

The OAPEN irus uk telescope.

Functions

upload_source_code_to_bucket(source_url, project_id, ...)

Upload source code of cloud function to storage bucket

cloud_function_exists(service, full_name)

Check if cloud function with a given name already exists

create_cloud_function(service, location, full_name, ...)

Create cloud function.

call_cloud_function(function_uri, release_date, ...)

Iteratively call cloud function, until it has finished processing all publishers.

class oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.IrusOapenRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date)[source]

Bases: observatory.platform.workflows.workflow.PartitionRelease

Create a IrusOapenRelease instance.

Parameters:
  • dag_id (str) – The ID of the DAG

  • run_id (str) – The Airflow run ID

  • partition_date (pendulum.DateTime) – The date of the partition/release

  • data_interval_start (pendulum.DateTime) –

  • data_interval_end (pendulum.DateTime) –

class oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.IrusOapenTelescope(dag_id, cloud_workspace, publisher_name_v4, publisher_uuid_v5, data_partner='irus_oapen', bq_dataset_description='IRUS dataset', bq_table_description=None, api_dataset_id='oapen', max_cloud_function_instances=0, observatory_api_conn_id=AirflowConns.OBSERVATORY_API, geoip_license_conn_id='geoip_license_key', irus_oapen_api_conn_id='irus_api', irus_oapen_login_conn_id='irus_login', catchup=True, start_date=pendulum.datetime(2015, 6, 1), schedule='0 0 4 * *', max_active_runs=5)[source]

Bases: observatory.platform.workflows.workflow.Workflow

The OAPEN irus uk telescope. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param publisher_name_v4: The publisher’s name for version 4 :param publisher_uuid_v5: The publisher’s uuid for version 5 :param data_partner: The data partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_table_description: Description for the biguery table :param api_dataset_id: The ID to store the dataset release in the API :param max_cloud_function_instances: :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param geoip_license_conn_id: The Airflow connection ID for the GEOIP license :param irus_oapen_api_conn_id: The Airflow connection ID for IRUS API - for counter 5 :param irus_oapen_login_conn_id: The Airflow connection ID for IRUS API (login) - for counter 4 :param catchup: Whether to catchup the DAG or not :param start_date: The start date of the DAG :param schedule: The schedule interval of the DAG :param max_active_runs: The maximum number of concurrent DAG instances

Parameters:
  • dag_id (str) –

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –

  • publisher_name_v4 (str) –

  • publisher_uuid_v5 (str) –

  • data_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • bq_dataset_description (str) –

  • bq_table_description (str) –

  • api_dataset_id (str) –

  • max_cloud_function_instances (int) –

  • observatory_api_conn_id (str) –

  • geoip_license_conn_id (str) –

  • irus_oapen_api_conn_id (str) –

  • irus_oapen_login_conn_id (str) –

  • catchup (bool) –

  • start_date (pendulum.DateTime) –

  • schedule (str) –

  • max_active_runs (int) –

OAPEN_PROJECT_ID = 'oapen-usage-data-gdpr-proof'[source]
OAPEN_BUCKET[source]
FUNCTION_NAME = 'oapen-access-stats'[source]
FUNCTION_REGION = 'europe-west1'[source]
FUNCTION_SOURCE_URL = 'https://github.com/The-Academic-Observatory/oapen-irus-uk-cloud-function/releases/download/v1.1.9...'[source]
FUNCTION_MD5_HASH = '946bb4d7ca229b15aba36ad7b5ed56d0'[source]
FUNCTION_BLOB_NAME = 'cloud_function_source_code.zip'[source]
FUNCTION_TIMEOUT = 1500[source]
make_release(**kwargs)[source]

Create a list of IrusOapenRelease instances for a given month. Say the dag is scheduled to run on 2022-04-07 Interval_start will be 2022-03-01 Interval_end will be 2022-04-01 partition_date will be 2022-03-31

Parameters:

kwargs – the context passed from the PythonOperator.

Return type:

List[IrusOapenRelease]

See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: list of IrusOapenRelease instances

transfer(releases, **kwargs)[source]

Task to transfer the file for each release.

Parameters:

releases (List[IrusOapenRelease]) – the list of IrusOapenRelease instances.

download_transform(releases, **kwargs)[source]

Task to download the access stats to a local file for each release.

Parameters:

releases (List[IrusOapenRelease]) –

create_cloud_function(releases, **kwargs)[source]

Task to create the cloud function for each release.

Parameters:

releases (List[IrusOapenRelease]) –

call_cloud_function(releases, **kwargs)[source]

Task to call the cloud function for each release.

Parameters:

releases (List[IrusOapenRelease]) –

upload_transformed(releases, **kwargs)[source]

Uploads the transformed files to GCS for each release

Parameters:

releases (List[IrusOapenRelease]) –

Return type:

None

bq_load(releases, **kwargs)[source]

Loads the sales and traffic data into BigQuery

Parameters:

releases (List[IrusOapenRelease]) –

Return type:

None

add_new_dataset_releases(releases, **kwargs)[source]

Adds release information to API.

Parameters:

releases (List[IrusOapenRelease]) –

Return type:

None

cleanup(releases, **kwargs)[source]

Delete all files, folders and XComs associated with this release.

Parameters:

releases (List[IrusOapenRelease]) –

Return type:

None

oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.upload_source_code_to_bucket(source_url, project_id, bucket_name, blob_name, cloud_function_path)[source]

Upload source code of cloud function to storage bucket

Parameters:
  • source_url (str) – The url to the zip file with source code

  • project_id (str) – The project id with the bucket

  • bucket_name (str) – The bucket name

  • blob_name (str) – The blob name

  • cloud_function_path (str) – The local path to the cloud function

Returns:

Whether task was successful and whether file was uploaded

Return type:

Tuple[bool, bool]

oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.cloud_function_exists(service, full_name)[source]

Check if cloud function with a given name already exists

Parameters:
  • service (googleapiclient.discovery.Resource) – Cloud function service

  • full_name (str) – Name of the cloud function

Returns:

URI if cloud function exists, else None

Return type:

Optional[str]

oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.create_cloud_function(service, location, full_name, source_bucket, blob_name, max_active_runs, update)[source]

Create cloud function.

Parameters:
  • service (googleapiclient.discovery.Resource) – Cloud function service

  • location (str) – Location of the cloud function

  • full_name (str) – Name of the cloud function

  • source_bucket (str) – Name of bucket where the source code is stored

  • blob_name (str) – Blob name of source code inside bucket

  • max_active_runs (int) – The limit on the maximum number of function instances that may coexist at a given time

  • update (bool) – Whether a new function is created or an existing one is updated

Returns:

Status of the cloud function and error/success message

Return type:

Tuple[bool, dict]

oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.call_cloud_function(function_uri, release_date, username, password, geoip_license_key, publisher_name_v4, publisher_uuid_v5, bucket_name, blob_name)[source]

Iteratively call cloud function, until it has finished processing all publishers. When a publisher name/uuid is given, there is only 1 publisher, if it is empty the cloud function will process all available publishers. In that case, when the data is downloaded from the new platform it can be done in 1 iteration, however for the old platform two files have to be downloaded separately for each publisher, this might take longer than the timeout time of the cloud function, so the process is split up in multiple calls.

Parameters:
  • function_uri (str) – URI of the cloud function

  • release_date (str) – The release date in YYYY-MM

  • username (str) – Oapen username (email or requestor_id)

  • password (str) – Oapen password (password or api_key)

  • geoip_license_key (str) – License key of geoip database

  • publisher_name_v4 (str) – URL encoded name of the publisher (used for counter version 4)

  • publisher_uuid_v5 (str) – UUID of the publisher (used for counter version 5)

  • bucket_name (str) – Name of the bucket to store oapen access stats data

  • blob_name (str) – Blob name to store oapen access stats data

Return type:

None