# Copyright 2020-2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Aniek Roelofs
import csv
import os
import re
from collections import OrderedDict, defaultdict
from typing import List, Tuple, Union
import pendulum
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance
from google.cloud.bigquery import TimePartitioningType, SourceFormat, WriteDisposition
from oaebu_workflows.oaebu_partners import OaebuPartner, partner_from_str
from observatory.api.client.model.dataset_release import DatasetRelease
from observatory.platform.api import make_observatory_api
from observatory.platform.airflow import AirflowConns
from observatory.platform.files import save_jsonl_gz
from observatory.platform.files import convert, add_partition_date
from observatory.platform.gcs import gcs_upload_files, gcs_blob_uri, gcs_blob_name_from_path
from observatory.platform.observatory_config import CloudWorkspace
from observatory.platform.bigquery import bq_load_table, bq_table_id, bq_create_dataset
from observatory.platform.sftp import SftpFolders, make_sftp_connection
from observatory.platform.workflows.workflow import (
PartitionRelease,
Workflow,
cleanup,
set_task_state,
check_workflow_inputs,
)
[docs]class GoogleBooksRelease(PartitionRelease):
def __init__(
self,
dag_id: str,
run_id: str,
partition_date: pendulum.DateTime,
sftp_files: List[str],
):
"""Construct a GoogleBooksRelease.
:param dag_id: The ID of the DAG
:param run_id: The Airflow run ID
:param partition_date: the partition date, corresponds to the last day of the month being processed.
:param sftp_files: List of full filepaths to download from sftp service (incl. in_progress folder)
"""
super().__init__(dag_id=dag_id, run_id=run_id, partition_date=partition_date)
self.download_sales_path = os.path.join(self.download_folder, "google_books_sales.csv")
self.download_traffic_path = os.path.join(self.download_folder, "google_books_traffic.csv")
self.transform_sales_path = os.path.join(self.transform_folder, "google_books_sales.jsonl.gz")
self.transform_traffic_path = os.path.join(self.transform_folder, "google_books_traffic.jsonl.gz")
self.sftp_files = sftp_files
[docs]class GoogleBooksTelescope(Workflow):
"""The Google Books telescope."""
def __init__(
self,
dag_id: str,
cloud_workspace: CloudWorkspace,
sftp_root: str = "/",
sales_partner: Union[str, OaebuPartner] = "google_books_sales",
traffic_partner: Union[str, OaebuPartner] = "google_books_traffic",
bq_dataset_description: str = "Data from Google sources",
bq_sales_table_description: str = None,
bq_traffic_table_description: str = None,
api_dataset_id: str = "google_books",
sftp_service_conn_id: str = "sftp_service",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
catchup: bool = False,
schedule: str = "@weekly",
start_date: pendulum.DateTime = pendulum.datetime(2018, 1, 1),
):
"""Construct a GoogleBooksTelescope instance.
:param dag_id: The ID of the DAG
:param cloud_workspace: The CloudWorkspace object for this DAG
:param sftp_root: The root of the SFTP filesystem to work with
:param sales_partner: The name of the sales partner
:param traffic_partner: The name of the traffic partner
:param bq_dataset_description: Description for the BigQuery dataset
:param bq_sales_table_description: Description for the BigQuery Google Books Sales table
:param bq_traffic_table_description: Description for the BigQuery Google Books Traffic table
:param api_dataset_id: The ID to store the dataset release in the API
:param sftp_service_conn_id: Airflow connection ID for the SFTP service
:param observatory_api_conn_id: Airflow connection ID for the overvatory API
:param catchup: Whether to catchup the DAG or not
:param schedule: The schedule interval of the DAG
:param start_date: The start date of the DAG
"""
super().__init__(
dag_id,
start_date,
schedule,
catchup=catchup,
airflow_conns=[sftp_service_conn_id, observatory_api_conn_id],
tags=["oaebu"],
)
self.dag_id = dag_id
self.cloud_workspace = cloud_workspace
self.sftp_root = sftp_root
self.sales_partner = partner_from_str(sales_partner)
self.traffic_partner = partner_from_str(traffic_partner)
self.bq_dataset_description = bq_dataset_description
self.bq_sales_table_description = bq_sales_table_description
self.bq_traffic_table_description = bq_traffic_table_description
self.api_dataset_id = api_dataset_id
self.sftp_service_conn_id = sftp_service_conn_id
self.observatory_api_conn_id = observatory_api_conn_id
# Extra SFTP parameters
self.sftp_folders = SftpFolders(dag_id, sftp_conn_id=sftp_service_conn_id, sftp_root=sftp_root)
self.sftp_regex = r"^Google(SalesTransaction|BooksTraffic)Report_\d{4}_\d{2}.csv$"
check_workflow_inputs(self)
self.add_setup_task(self.check_dependencies)
self.add_setup_task(self.list_release_info)
self.add_task(self.move_files_to_in_progress)
self.add_task(self.download)
self.add_task(self.upload_downloaded)
self.add_task(self.transform)
self.add_task(self.upload_transformed)
self.add_task(self.bq_load)
self.add_task(self.move_files_to_finished)
self.add_task(self.add_new_dataset_releases)
self.add_task(self.cleanup)
[docs] def make_release(self, **kwargs) -> List[GoogleBooksRelease]:
"""Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is
called in 'task_callable'.
:param kwargs: the context passed from the PythonOperator.
See https://airflow.apache.org/docs/stable/macros-ref.html for the keyword arguments that can be passed
:return: A list of google books release instances
"""
ti: TaskInstance = kwargs["ti"]
reports_info = ti.xcom_pull(
key=GoogleBooksTelescope.RELEASE_INFO, task_ids=self.list_release_info.__name__, include_prior_dates=False
)
releases = []
run_id = kwargs["run_id"]
for partition_date, sftp_files in reports_info.items():
releases.append(
GoogleBooksRelease(
self.dag_id, run_id=run_id, partition_date=pendulum.parse(partition_date), sftp_files=sftp_files
)
)
return releases
[docs] def list_release_info(self, **kwargs) -> bool:
"""Lists all Google Books releases available on the SFTP server and publishes sftp file paths and
release_date's as an XCom.
:return: the identifier of the task to execute next.
"""
reports = defaultdict(list)
# List all reports in the 'upload' folder of the organisation
with make_sftp_connection(self.sftp_service_conn_id) as sftp:
files = sftp.listdir(self.sftp_folders.upload)
for file_name in files:
match = re.match(self.sftp_regex, file_name)
if match:
# Get the release date from file name
date_str = file_name[-11:].strip(".csv")
release_date = pendulum.from_format(date_str, "YYYY_MM").end_of("month")
release_date = release_date.format("YYYYMMDD")
# Get the report type from file name
report_type = match.group(1)
# Create the full path of the file for the 'in progress' folder
sftp_file = os.path.join(self.sftp_folders.in_progress, file_name)
# Append report
reports[report_type + release_date].append(sftp_file)
# Check that for each report type + date combination there is a report available
release_info = defaultdict(list)
for report, sftp_files in reports.items():
release_date = report[-8:]
release_info[release_date] += sftp_files
continue_dag = bool(release_info)
if continue_dag:
# Push messages
ti: TaskInstance = kwargs["ti"]
ti.xcom_push(GoogleBooksTelescope.RELEASE_INFO, release_info)
return continue_dag
[docs] def move_files_to_in_progress(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Move Google Books files to SFTP in-progress folder."""
for release in releases:
self.sftp_folders.move_files_to_in_progress(release.sftp_files)
[docs] def download(self, releases: List[GoogleBooksRelease], **kwargs):
"""Task to download the Google Books releases for a given month."""
for release in releases:
with make_sftp_connection(self.sftp_service_conn_id) as sftp:
for file in release.sftp_files:
if "Traffic" in file:
sftp.get(file, localpath=release.download_traffic_path)
elif "Transaction" in file:
sftp.get(file, localpath=release.download_sales_path)
assert os.path.exists(release.download_traffic_path) and os.path.exists(release.download_sales_path)
[docs] def upload_downloaded(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Uploads the downloaded files to GCS for each release"""
for release in releases:
success = gcs_upload_files(
bucket_name=self.cloud_workspace.download_bucket,
file_paths=[release.download_sales_path, release.download_traffic_path],
)
if not success:
raise AirflowException(f"Files could not be uploaded to cloud storage bucket: {self.transform_bucket}")
[docs] def move_files_to_finished(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Move Google Books files to SFTP finished folder."""
for release in releases:
self.sftp_folders.move_files_to_finished(release.sftp_files)
[docs] def bq_load(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Loads the sales and traffic data into BigQuery"""
for release in releases:
for partner, table_description, file_path in [
[self.sales_partner, self.bq_sales_table_description, release.transform_sales_path],
[self.traffic_partner, self.bq_traffic_table_description, release.transform_traffic_path],
]:
bq_create_dataset(
project_id=self.cloud_workspace.project_id,
dataset_id=partner.bq_dataset_id,
location=self.cloud_workspace.data_location,
description=self.bq_dataset_description,
)
uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, gcs_blob_name_from_path(file_path))
table_id = bq_table_id(self.cloud_workspace.project_id, partner.bq_dataset_id, partner.bq_table_name)
success = bq_load_table(
uri=uri,
table_id=table_id,
schema_file_path=partner.schema_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
partition_type=TimePartitioningType.MONTH,
partition=True,
partition_field="release_date",
write_disposition=WriteDisposition.WRITE_APPEND,
table_description=table_description,
ignore_unknown_values=True,
)
set_task_state(success, kwargs["ti"].task_id, release=release)
[docs] def add_new_dataset_releases(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Adds release information to API."""
api = make_observatory_api(observatory_api_conn_id=self.observatory_api_conn_id)
for release in releases:
dataset_release = DatasetRelease(
dag_id=self.dag_id,
dataset_id=self.api_dataset_id,
dag_run_id=release.run_id,
data_interval_start=kwargs["data_interval_start"],
data_interval_end=kwargs["data_interval_end"],
partition_date=release.partition_date,
)
api.post_dataset_release(dataset_release)
[docs] def cleanup(self, releases: List[GoogleBooksRelease], **kwargs) -> None:
"""Delete all files, folders and XComs associated with this release."""
for release in releases:
cleanup(
dag_id=self.dag_id, execution_date=kwargs["execution_date"], workflow_folder=release.workflow_folder
)