oaebu_workflows.google_books_telescope.google_books_telescope
Module Contents
Classes
Construct a GoogleBooksRelease. |
|
The Google Books telescope. |
Functions
|
Transforms sales and traffic reports. For both reports it transforms the csv into a jsonl file and |
- class oaebu_workflows.google_books_telescope.google_books_telescope.GoogleBooksRelease(dag_id, run_id, partition_date, sftp_files)[source]
Bases:
observatory.platform.workflows.workflow.PartitionRelease
Construct a GoogleBooksRelease.
- Parameters:
dag_id (str) – The ID of the DAG
run_id (str) – The Airflow run ID
partition_date (pendulum.DateTime) – the partition date, corresponds to the last day of the month being processed.
sftp_files (List[str]) – List of full filepaths to download from sftp service (incl. in_progress folder)
- class oaebu_workflows.google_books_telescope.google_books_telescope.GoogleBooksTelescope(dag_id, cloud_workspace, sftp_root='/', sales_partner='google_books_sales', traffic_partner='google_books_traffic', bq_dataset_description='Data from Google sources', bq_sales_table_description=None, bq_traffic_table_description=None, api_dataset_id='google_books', sftp_service_conn_id='sftp_service', observatory_api_conn_id=AirflowConns.OBSERVATORY_API, catchup=False, schedule='@weekly', start_date=pendulum.datetime(2018, 1, 1))[source]
Bases:
observatory.platform.workflows.workflow.Workflow
The Google Books telescope.
Construct a GoogleBooksTelescope instance. :param dag_id: The ID of the DAG :param cloud_workspace: The CloudWorkspace object for this DAG :param sftp_root: The root of the SFTP filesystem to work with :param sales_partner: The name of the sales partner :param traffic_partner: The name of the traffic partner :param bq_dataset_description: Description for the BigQuery dataset :param bq_sales_table_description: Description for the BigQuery Google Books Sales table :param bq_traffic_table_description: Description for the BigQuery Google Books Traffic table :param api_dataset_id: The ID to store the dataset release in the API :param sftp_service_conn_id: Airflow connection ID for the SFTP service :param observatory_api_conn_id: Airflow connection ID for the overvatory API :param catchup: Whether to catchup the DAG or not :param schedule: The schedule interval of the DAG :param start_date: The start date of the DAG
- Parameters:
dag_id (str) –
cloud_workspace (observatory.platform.observatory_config.CloudWorkspace) –
sftp_root (str) –
sales_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
traffic_partner (Union[str, oaebu_workflows.oaebu_partners.OaebuPartner]) –
bq_dataset_description (str) –
bq_sales_table_description (str) –
bq_traffic_table_description (str) –
api_dataset_id (str) –
sftp_service_conn_id (str) –
observatory_api_conn_id (str) –
catchup (bool) –
schedule (str) –
start_date (pendulum.DateTime) –
- make_release(**kwargs)[source]
Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.
- Parameters:
kwargs – the context passed from the PythonOperator.
- Return type:
List[GoogleBooksRelease]
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed :return: A list of google books release instances
- list_release_info(**kwargs)[source]
Lists all Google Books releases available on the SFTP server and publishes sftp file paths and release_date’s as an XCom.
- Returns:
the identifier of the task to execute next.
- Return type:
bool
- move_files_to_in_progress(releases, **kwargs)[source]
Move Google Books files to SFTP in-progress folder.
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- download(releases, **kwargs)[source]
Task to download the Google Books releases for a given month.
- Parameters:
releases (List[GoogleBooksRelease]) –
- upload_downloaded(releases, **kwargs)[source]
Uploads the downloaded files to GCS for each release
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- transform(releases, **kwargs)[source]
Task to transform the Google Books releases for a given month.
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- upload_transformed(releases, **kwargs)[source]
Uploads the transformed files to GCS for each release
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- move_files_to_finished(releases, **kwargs)[source]
Move Google Books files to SFTP finished folder.
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- bq_load(releases, **kwargs)[source]
Loads the sales and traffic data into BigQuery
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- add_new_dataset_releases(releases, **kwargs)[source]
Adds release information to API.
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- cleanup(releases, **kwargs)[source]
Delete all files, folders and XComs associated with this release.
- Parameters:
releases (List[GoogleBooksRelease]) –
- Return type:
None
- oaebu_workflows.google_books_telescope.google_books_telescope.gb_transform(download_files, sales_path, traffic_path, release_date)[source]
Transforms sales and traffic reports. For both reports it transforms the csv into a jsonl file and replaces spaces in the keys with underscores.
- Parameters:
download_files (Tuple[str, str]) – The Google Books Sales and Traffic files
sales_path (str) – The file path to save the transformed sales data to
traffic_path (str) – The file path to save the transformed traffic data to
release_date (pendulum.DateTime) – The release date to use as a partitioning date
- Return type:
None