oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope
Module Contents
Classes
Create a IrusOapenRelease instance. |
|
The OAPEN irus uk telescope. |
Functions
|
Upload source code of cloud function to storage bucket |
|
Check if cloud function with a given name already exists |
|
Create cloud function. |
|
Iteratively call cloud function, until it has finished processing all publishers. |
- class oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.IrusOapenRelease(dag_id, run_id, data_interval_start, data_interval_end, partition_date)[source]
Bases:
observatory.platform.workflows.workflow.PartitionRelease
Create a IrusOapenRelease instance.
- Parameters:
dag_id (str) – The ID of the DAG
run_id (str) – The Airflow run ID
partition_date (pendulum.DateTime) – The date of the partition/release
data_interval_start (pendulum.DateTime) –
data_interval_end (pendulum.DateTime) –
- class oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.IrusOapenTelescope(dag_id, cloud_workspace, publisher_name_v4, publisher_uuid_v5, data_partner='irus_oapen', bq_dataset_description='IRUS dataset', bq_table_description=None, api_dataset_id='oapen', max_cloud_function_instances=0, observatory_api_conn_id=AirflowConns.OBSERVATORY_API, geoip_license_conn_id='geoip_license_key', irus_oapen_api_conn_id='irus_api', irus_oapen_login_conn_id='irus_login', catchup=True, start_date=pendulum.datetime(2015, 6, 1), schedule='0 0 4 * *', max_active_runs=5)[source]
Bases:
observatory.platform.workflows.workflow.Workflow
The OAPEN irus uk telescope. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param publisher_name_v4: The publisher’s name for version 4 :param publisher_uuid_v5: The publisher’s uuid for version 5 :param data_partner: The data partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_table_description: Description for the biguery table :param api_dataset_id: The ID to store the dataset release in the API :param max_cloud_function_instances: :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param geoip_license_conn_id: The Airflow connection ID for the GEOIP license :param irus_oapen_api_conn_id: The Airflow connection ID for IRUS API - for counter 5 :param irus_oapen_login_conn_id: The Airflow connection ID for IRUS API (login) - for counter 4 :param catchup: Whether to catchup the DAG or not :param start_date: The start date of the DAG :param schedule: The schedule interval of the DAG :param max_active_runs: The maximum number of concurrent DAG instances
- Parameters:
dag_id (str) –
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –
publisher_name_v4 (str) –
publisher_uuid_v5 (str) –
data_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
bq_dataset_description (str) –
bq_table_description (str) –
api_dataset_id (str) –
max_cloud_function_instances (int) –
observatory_api_conn_id (str) –
geoip_license_conn_id (str) –
irus_oapen_api_conn_id (str) –
irus_oapen_login_conn_id (str) –
catchup (bool) –
start_date (pendulum.DateTime) –
schedule (str) –
max_active_runs (int) –
- FUNCTION_SOURCE_URL = 'https://github.com/The-Academic-Observatory/oapen-irus-uk-cloud-function/releases/download/v1.1.9...'[source]
- make_release(**kwargs)[source]
Create a list of IrusOapenRelease instances for a given month. Say the dag is scheduled to run on 2022-04-07 Interval_start will be 2022-03-01 Interval_end will be 2022-04-01 partition_date will be 2022-03-31
- Parameters:
kwargs – the context passed from the PythonOperator.
- Return type:
List[IrusOapenRelease]
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: list of IrusOapenRelease instances
- transfer(releases, **kwargs)[source]
Task to transfer the file for each release.
- Parameters:
releases (List[IrusOapenRelease]) – the list of IrusOapenRelease instances.
- download_transform(releases, **kwargs)[source]
Task to download the access stats to a local file for each release.
- Parameters:
releases (List[IrusOapenRelease]) –
- create_cloud_function(releases, **kwargs)[source]
Task to create the cloud function for each release.
- Parameters:
releases (List[IrusOapenRelease]) –
- call_cloud_function(releases, **kwargs)[source]
Task to call the cloud function for each release.
- Parameters:
releases (List[IrusOapenRelease]) –
- upload_transformed(releases, **kwargs)[source]
Uploads the transformed files to GCS for each release
- Parameters:
releases (List[IrusOapenRelease]) –
- Return type:
None
- bq_load(releases, **kwargs)[source]
Loads the sales and traffic data into BigQuery
- Parameters:
releases (List[IrusOapenRelease]) –
- Return type:
None
- add_new_dataset_releases(releases, **kwargs)[source]
Adds release information to API.
- Parameters:
releases (List[IrusOapenRelease]) –
- Return type:
None
- cleanup(releases, **kwargs)[source]
Delete all files, folders and XComs associated with this release.
- Parameters:
releases (List[IrusOapenRelease]) –
- Return type:
None
- oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.upload_source_code_to_bucket(source_url, project_id, bucket_name, blob_name, cloud_function_path)[source]
Upload source code of cloud function to storage bucket
- Parameters:
source_url (str) – The url to the zip file with source code
project_id (str) – The project id with the bucket
bucket_name (str) – The bucket name
blob_name (str) – The blob name
cloud_function_path (str) – The local path to the cloud function
- Returns:
Whether task was successful and whether file was uploaded
- Return type:
Tuple[bool, bool]
- oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.cloud_function_exists(service, full_name)[source]
Check if cloud function with a given name already exists
- Parameters:
service (googleapiclient.discovery.Resource) – Cloud function service
full_name (str) – Name of the cloud function
- Returns:
URI if cloud function exists, else None
- Return type:
Optional[str]
- oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.create_cloud_function(service, location, full_name, source_bucket, blob_name, max_active_runs, update)[source]
Create cloud function.
- Parameters:
service (googleapiclient.discovery.Resource) – Cloud function service
location (str) – Location of the cloud function
full_name (str) – Name of the cloud function
source_bucket (str) – Name of bucket where the source code is stored
blob_name (str) – Blob name of source code inside bucket
max_active_runs (int) – The limit on the maximum number of function instances that may coexist at a given time
update (bool) – Whether a new function is created or an existing one is updated
- Returns:
Status of the cloud function and error/success message
- Return type:
Tuple[bool, dict]
- oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.call_cloud_function(function_uri, release_date, username, password, geoip_license_key, publisher_name_v4, publisher_uuid_v5, bucket_name, blob_name)[source]
Iteratively call cloud function, until it has finished processing all publishers. When a publisher name/uuid is given, there is only 1 publisher, if it is empty the cloud function will process all available publishers. In that case, when the data is downloaded from the new platform it can be done in 1 iteration, however for the old platform two files have to be downloaded separately for each publisher, this might take longer than the timeout time of the cloud function, so the process is split up in multiple calls.
- Parameters:
function_uri (str) – URI of the cloud function
release_date (str) – The release date in YYYY-MM
username (str) – Oapen username (email or requestor_id)
password (str) – Oapen password (password or api_key)
geoip_license_key (str) – License key of geoip database
publisher_name_v4 (str) – URL encoded name of the publisher (used for counter version 4)
publisher_uuid_v5 (str) – UUID of the publisher (used for counter version 5)
bucket_name (str) – Name of the bucket to store oapen access stats data
blob_name (str) – Blob name to store oapen access stats data
- Return type:
None