oaebu_workflows.onix_telescope.onix_telescope
Module Contents
Classes
Construct an OnixRelease. |
|
Construct an OnixTelescope instance. |
- class oaebu_workflows.onix_telescope.onix_telescope.OnixRelease(*, dag_id, run_id, snapshot_date, onix_file_name)[source]
Bases:
observatory.platform.workflows.workflow.SnapshotRelease
Construct an OnixRelease.
- Parameters:
dag_id (str) – The ID of the DAG
run_id (str) – The Airflow run ID
snapshot_date (pendulum.DateTime) – The date of the snapshot/release
onix_file_name (str) – The ONIX file name.
- class oaebu_workflows.onix_telescope.onix_telescope.OnixTelescope(*, dag_id, cloud_workspace, date_regex, sftp_root='/', metadata_partner='onix', bq_dataset_description='ONIX data provided by Org', bq_table_description=None, api_dataset_id='onix', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, sftp_service_conn_id='sftp_service', catchup=False, schedule='@weekly', start_date=pendulum.datetime(2021, 3, 28))[source]
Bases:
observatory.platform.workflows.workflow.Workflow
Construct an OnixTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param sftp_root: The working root of the SFTP server, passed to the SftoFolders class :param metadata_partner: The metadata partner name :param date_regex: Regular expression for extracting a date string from an ONIX file name :param bq_dataset_description: Description for the BigQuery dataset :param bq_table_description: Description for the biguery table :param api_dataset_id: The ID to store the dataset release in the API :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param sftp_service_conn_id: Airflow connection ID for the SFTP service :param catchup: Whether to catchup the DAG or not :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG
- Parameters:
dag_id (str) –
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –
date_regex (str) –
sftp_root (str) –
metadata_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
bq_dataset_description (str) –
bq_table_description (str) –
api_dataset_id (str) –
observatory_api_conn_id (str) –
sftp_service_conn_id (str) –
catchup (bool) –
schedule (str) –
start_date (pendulum.DateTime) –
- list_release_info(**kwargs)[source]
Lists all ONIX releases and publishes their file names as an XCom.
- Parameters:
kwargs – the context passed from the BranchPythonOperator.
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: the identifier of the task to execute next.
- make_release(**kwargs)[source]
Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.
- Returns:
a list of Onix release instances.
- Return type:
List[OnixRelease]
- move_files_to_in_progress(releases, **kwargs)[source]
Move ONIX files to SFTP in-progress folder. :param releases: a list of Onix release instances
- Parameters:
releases (List[OnixRelease]) –
- download(releases, **kwargs)[source]
Task to download the ONIX releases.
- Parameters:
releases (List[OnixRelease]) –
- upload_downloaded(releases, **kwargs)[source]
Uploads the downloaded onix file to GCS
- Parameters:
releases (List[OnixRelease]) –
- transform(releases, **kwargs)[source]
Task to transform the ONIX releases.
- Parameters:
releases (List[OnixRelease]) –
- upload_transformed(releases, **kwargs)[source]
Uploads the transformed file to GCS
- Parameters:
releases (List[OnixRelease]) –
- bq_load(releases, **kwargs)[source]
Task to load each transformed release to BigQuery.
- Parameters:
releases (List[OnixRelease]) –
- move_files_to_finished(releases, **kwargs)[source]
Move ONIX files to SFTP finished folder.
- Parameters:
releases (List[OnixRelease]) –
- add_new_dataset_releases(releases, **kwargs)[source]
Adds release information to API.
- Parameters:
releases (List[OnixRelease]) –
- Return type:
None
- cleanup(releases, **kwargs)[source]
Delete all files, folders and XComs associated with this release.
- Parameters:
releases (List[OnixRelease]) –
- Return type:
None