oaebu_workflows.google_books_telescope.google_books_telescope

Module Contents

Classes

GoogleBooksRelease

Construct a GoogleBooksRelease.

GoogleBooksTelescope

The Google Books telescope.

Functions

gb_transform(download_files, sales_path, traffic_path, ...)

Transforms sales and traffic reports. For both reports it transforms the csv into a jsonl file and

class oaebu_workflows.google_books_telescope.google_books_telescope.GoogleBooksRelease(dag_id, run_id, partition_date, sftp_files)[source]

Bases: observatory.platform.workflows.workflow.PartitionRelease

Construct a GoogleBooksRelease.

Parameters:
  • dag_id (str) – The ID of the DAG

  • run_id (str) – The Airflow run ID

  • partition_date (pendulum.DateTime) – the partition date, corresponds to the last day of the month being processed.

  • sftp_files (List[str]) – List of full filepaths to download from sftp service (incl. in_progress folder)

class oaebu_workflows.google_books_telescope.google_books_telescope.GoogleBooksTelescope(dag_id, cloud_workspace, sftp_root='/', sales_partner='google_books_sales', traffic_partner='google_books_traffic', bq_dataset_description='Data from Google sources', bq_sales_table_description=None, bq_traffic_table_description=None, api_dataset_id='google_books', sftp_service_conn_id='sftp_service', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, catchup=False, schedule='@weekly', start_date=pendulum.datetime(2018, 1, 1))[source]

Bases: observatory.platform.workflows.workflow.Workflow

The Google Books telescope.

Construct a GoogleBooksTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param sftp_root: The root of the SFTP filesystem to work with :param sales_partner: The name of the sales partner :param traffic_partner: The name of the traffic partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_sales_table_description: Description for the BigQuery Google Books Sales table :param bq_traffic_table_description: Description for the BigQuery Google Books Traffic table :param api_dataset_id: The ID to store the dataset release in the API :param sftp_service_conn_id: Airflow connection ID for the SFTP service :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param catchup: Whether to catchup the DAG or not :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG

Parameters:
  • dag_id (str) –

  • cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –

  • sftp_root (str) –

  • sales_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • traffic_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –

  • bq_dataset_description (str) –

  • bq_sales_table_description (str) –

  • bq_traffic_table_description (str) –

  • api_dataset_id (str) –

  • sftp_service_conn_id (str) –

  • observatory_api_conn_id (str) –

  • catchup (bool) –

  • schedule (str) –

  • start_date (pendulum.DateTime) –

make_release(**kwargs)[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.

Parameters:

kwargs – the context passed from the PythonOperator.

Return type:

List[GoogleBooksRelease]

See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list of google books release instances

list_release_info(**kwargs)[source]

Lists all Google Books releases available on the SFTP server and publishes sftp file paths and release_date’s as an XCom.

Returns:

the identifier of the task to execute next.

Return type:

bool

move_files_to_in_progress(releases, **kwargs)[source]

Move Google Books files to SFTP in-progress folder.

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

download(releases, **kwargs)[source]

Task to download the Google Books releases for a given month.

Parameters:

releases (List[GoogleBooksRelease]) –

upload_downloaded(releases, **kwargs)[source]

Uploads the downloaded files to GCS for each release

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

transform(releases, **kwargs)[source]

Task to transform the Google Books releases for a given month.

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

upload_transformed(releases, **kwargs)[source]

Uploads the transformed files to GCS for each release

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

move_files_to_finished(releases, **kwargs)[source]

Move Google Books files to SFTP finished folder.

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

bq_load(releases, **kwargs)[source]

Loads the sales and traffic data into BigQuery

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

add_new_dataset_releases(releases, **kwargs)[source]

Adds release information to API.

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

cleanup(releases, **kwargs)[source]

Delete all files, folders and XComs associated with this release.

Parameters:

releases (List[GoogleBooksRelease]) –

Return type:

None

oaebu_workflows.google_books_telescope.google_books_telescope.gb_transform(download_files, sales_path, traffic_path, release_date)[source]

Transforms sales and traffic reports. For both reports it transforms the csv into a jsonl file and replaces spaces in the keys with underscores.

Parameters:
  • download_files (Tuple[str, str]) – The Google Books Sales and Traffic files

  • sales_path (str) – The file path to save the transformed sales data to

  • traffic_path (str) – The file path to save the transformed traffic data to

  • release_date (pendulum.DateTime) – The release date to use as a partitioning date

Return type:

None