oaebu_workflows.onix_telescope.onix_telescope

Module Contents

Classes

OnixRelease

Construct an OnixRelease.

OnixTelescope

Construct an OnixTelescope instance.

class oaebu_workflows.onix_telescope.onix_telescope.OnixRelease(*, dag_id, run_id, snapshot_date, onix_file_name)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

Construct an OnixRelease.

Parameters:
  • dag_id (str) – The ID of the DAG

  • run_id (str) – The Airflow run ID

  • snapshot_date (pendulum.DateTime) – The date of the snapshot/release

  • onix_file_name (str) – The ONIX file name.

class oaebu_workflows.onix_telescope.onix_telescope.OnixTelescope(*, dag_id, cloud_workspace, date_regex, sftp_root='/', metadata_partner='onix', bq_dataset_description='ONIX data provided by Org', bq_table_description=None, api_dataset_id='onix', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, sftp_service_conn_id='sftp_service', catchup=False, schedule='@weekly', start_date=pendulum.datetime(2021, 3, 28))[source]

Bases: observatory.platform.workflows.workflow.Workflow

Construct an OnixTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param sftp_root: The working root of the SFTP server, passed to the SftoFolders class :param metadata_partner: The metadata partner name :param date_regex: Regular expression for extracting a date string from an ONIX file name :param bq_dataset_description: Description for the BigQuery dataset :param bq_table_description: Description for the biguery table :param api_dataset_id: The ID to store the dataset release in the API :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param sftp_service_conn_id: Airflow connection ID for the SFTP service :param catchup: Whether to catchup the DAG or not :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG

Parameters:
  • dag_id (str) –

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –

  • date_regex (str) –

  • sftp_root (str) –

  • metadata_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • bq_dataset_description (str) –

  • bq_table_description (str) –

  • api_dataset_id (str) –

  • observatory_api_conn_id (str) –

  • sftp_service_conn_id (str) –

  • catchup (bool) –

  • schedule (str) –

  • start_date (pendulum.DateTime) –

list_release_info(**kwargs)[source]

Lists all ONIX releases and publishes their file names as an XCom.

Parameters:

kwargs – the context passed from the BranchPythonOperator.

See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: the identifier of the task to execute next.

make_release(**kwargs)[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.

Returns:

a list of Onix release instances.

Return type:

List[OnixRelease]

move_files_to_in_progress(releases, **kwargs)[source]

Move ONIX files to SFTP in-progress folder. :param releases: a list of Onix release instances

Parameters:

releases (List[OnixRelease]) –

download(releases, **kwargs)[source]

Task to download the ONIX releases.

Parameters:

releases (List[OnixRelease]) –

upload_downloaded(releases, **kwargs)[source]

Uploads the downloaded onix file to GCS

Parameters:

releases (List[OnixRelease]) –

transform(releases, **kwargs)[source]

Task to transform the ONIX releases.

Parameters:

releases (List[OnixRelease]) –

upload_transformed(releases, **kwargs)[source]

Uploads the transformed file to GCS

Parameters:

releases (List[OnixRelease]) –

bq_load(releases, **kwargs)[source]

Task to load each transformed release to BigQuery.

Parameters:

releases (List[OnixRelease]) –

move_files_to_finished(releases, **kwargs)[source]

Move ONIX files to SFTP finished folder.

Parameters:

releases (List[OnixRelease]) –

add_new_dataset_releases(releases, **kwargs)[source]

Adds release information to API.

Parameters:

releases (List[OnixRelease]) –

Return type:

None

cleanup(releases, **kwargs)[source]

Delete all files, folders and XComs associated with this release.

Parameters:

releases (List[OnixRelease]) –

Return type:

None