# Copyright 2020-2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Aniek Roelofs
import os
import shutil
from collections import defaultdict
from unittest.mock import patch
import pendulum
from airflow.exceptions import AirflowException
from airflow.models.connection import Connection
from airflow.utils.state import State
from click.testing import CliRunner
from oaebu_workflows.config import test_fixtures_folder
from oaebu_workflows.oaebu_partners import partner_from_str
from oaebu_workflows.google_books_telescope.google_books_telescope import (
GoogleBooksRelease,
GoogleBooksTelescope,
)
from observatory.platform.observatory_environment import (
ObservatoryEnvironment,
ObservatoryTestCase,
SftpServer,
find_free_port,
random_id,
)
from observatory.platform.bigquery import bq_table_id
from observatory.platform.observatory_config import Workflow
from observatory.platform.sftp import SftpFolders
from observatory.platform.gcs import gcs_blob_name_from_path
from observatory.platform.api import get_dataset_releases
[docs]class TestGoogleBooksTelescope(ObservatoryTestCase):
"""Tests for the GoogleBooks telescope"""
def __init__(self, *args, **kwargs):
"""Constructor which sets up variables used by tests.
:param args: arguments.
:param kwargs: keyword arguments.
"""
super(TestGoogleBooksTelescope, self).__init__(*args, **kwargs)
self.project_id = os.getenv("TEST_GCP_PROJECT_ID")
self.data_location = os.getenv("TEST_GCP_DATA_LOCATION")
self.sftp_port = find_free_port()
[docs] def test_dag_structure(self):
"""Test that the Google Books DAG has the correct structure."""
dag = GoogleBooksTelescope(
dag_id="test_dag", cloud_workspace=self.fake_cloud_workspace, sftp_root="/"
).make_dag()
self.assert_dag_structure(
{
"check_dependencies": ["list_release_info"],
"list_release_info": ["move_files_to_in_progress"],
"move_files_to_in_progress": ["download"],
"download": ["upload_downloaded"],
"upload_downloaded": ["transform"],
"transform": ["upload_transformed"],
"upload_transformed": ["bq_load"],
"bq_load": ["move_files_to_finished"],
"move_files_to_finished": ["add_new_dataset_releases"],
"add_new_dataset_releases": ["cleanup"],
"cleanup": [],
},
dag,
)
[docs] def test_dag_load(self):
"""Test that the Google Books DAG can be loaded from a DAG bag."""
# Run tests both for telescope with file suffixes and without
env = ObservatoryEnvironment(
workflows=[
Workflow(
dag_id="google_books",
name="My Google Books Telescope",
class_name="oaebu_workflows.google_books_telescope.google_books_telescope.GoogleBooksTelescope",
cloud_workspace=self.fake_cloud_workspace,
)
]
)
with env.create():
self.assert_dag_load_from_config("google_books")
[docs] def test_telescope(self):
"""Test the Google Books telescope end to end."""
fixtures_folder = test_fixtures_folder(workflow_module="google_books_telescope")
params = {
"no_download_files": 2,
"bq_rows": 4,
"traffic_download_hash": "db4dca44d5231e0c4e2ad95db41b79b6",
"traffic_transform_hash": "b8073007",
"sales_download_hash": "6496518be1ea73694d0a8f89c0b42f20",
"sales_transform_hash": "ebe49987",
"test_files": {
"GoogleBooksTrafficReport_2020_02.csv": os.path.join(
fixtures_folder, "GoogleBooksTrafficReport_2020_02.csv"
),
"GoogleSalesTransactionReport_2020_02.csv": os.path.join(
fixtures_folder, "GoogleSalesTransactionReport_2020_02.csv"
),
},
}
# Setup Observatory environment
env = ObservatoryEnvironment(
self.project_id, self.data_location, api_host="localhost", api_port=find_free_port()
)
sftp_server = SftpServer(host="localhost", port=self.sftp_port)
dataset_id = env.add_dataset()
# Create the Observatory environment and run tests
with env.create():
with sftp_server.create() as sftp_root:
# Setup Telescope
execution_date = pendulum.datetime(year=2021, month=3, day=31)
sales_partner = partner_from_str("google_books_sales")
sales_partner.bq_dataset_id = dataset_id
traffic_partner = partner_from_str("google_books_traffic")
traffic_partner.bq_dataset_id = dataset_id
telescope = GoogleBooksTelescope(
dag_id="google_books_test",
cloud_workspace=env.cloud_workspace,
sftp_root="/",
sales_partner=sales_partner,
traffic_partner=traffic_partner,
)
dag = telescope.make_dag()
# Add SFTP connection
conn = Connection(
conn_id=telescope.sftp_service_conn_id, uri=f"ssh://:password@localhost:{self.sftp_port}"
)
env.add_connection(conn)
with env.create_dag_run(dag, execution_date):
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task(telescope.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# Add file to SFTP server
local_sftp_folders = SftpFolders(telescope.dag_id, telescope.sftp_service_conn_id, sftp_root)
os.makedirs(local_sftp_folders.upload, exist_ok=True)
for file_name, file_path in params["test_files"].items():
upload_file = os.path.join(local_sftp_folders.upload, file_name)
shutil.copy(file_path, upload_file)
# Check that the correct release info is returned via Xcom
ti = env.run_task(telescope.list_release_info.__name__)
self.assertEqual(ti.state, State.SUCCESS)
release_info = ti.xcom_pull(
key=GoogleBooksTelescope.RELEASE_INFO,
task_ids=telescope.list_release_info.__name__,
include_prior_dates=False,
)
# Get release info from SFTP server and create expected release info
expected_release_info = defaultdict(list)
for file_name, file_path in params["test_files"].items():
expected_release_date = pendulum.from_format(file_name[-11:].strip(".csv"), "YYYY_MM").end_of(
"month"
)
release_date_str = expected_release_date.format("YYYYMMDD")
if release_date_str == "20200229":
expected_release_file = os.path.join(telescope.sftp_folders.in_progress, file_name)
expected_release_info[release_date_str].append(expected_release_file)
self.assertTrue(1, len(release_info))
self.assertEqual(expected_release_info["20200229"].sort(), release_info["20200229"].sort())
# Use release info for other tasks
releases = []
for release_date, sftp_files in release_info.items():
releases.append(
GoogleBooksRelease(
dag_id=telescope.dag_id,
run_id=env.dag_run.run_id,
partition_date=pendulum.parse(release_date),
sftp_files=sftp_files,
)
)
self.assertTrue(1, len(releases))
release = releases[0]
# Test move file to in progress
ti = env.run_task(telescope.move_files_to_in_progress.__name__)
self.assertEqual(ti.state, State.SUCCESS)
for file in release.sftp_files:
file_name = os.path.basename(file)
upload_file = os.path.join(local_sftp_folders.upload, file_name)
self.assertFalse(os.path.isfile(upload_file))
in_progress_file = os.path.join(local_sftp_folders.in_progress, file_name)
self.assertTrue(os.path.isfile(in_progress_file))
# Run main telescope tasks
ti = env.run_task(telescope.download.__name__)
self.assertEqual(ti.state, State.SUCCESS)
ti = env.run_task(telescope.upload_downloaded.__name__)
self.assertEqual(ti.state, State.SUCCESS)
ti = env.run_task(telescope.transform.__name__)
self.assertEqual(ti.state, State.SUCCESS)
ti = env.run_task(telescope.upload_transformed.__name__)
self.assertEqual(ti.state, State.SUCCESS)
ti = env.run_task(telescope.bq_load.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# Make assertions for the above tasks
# Test download
self.assertTrue(os.path.exists(release.download_traffic_path))
self.assertTrue(os.path.exists(release.download_sales_path))
self.assert_file_integrity(release.download_traffic_path, params["traffic_download_hash"], "md5")
self.assert_file_integrity(release.download_sales_path, params["sales_download_hash"], "md5")
# Test upload downloaded
self.assert_blob_integrity(
env.download_bucket,
gcs_blob_name_from_path(release.download_traffic_path),
release.download_traffic_path,
)
self.assert_blob_integrity(
env.download_bucket,
gcs_blob_name_from_path(release.download_sales_path),
release.download_sales_path,
)
# Test that file transformed
self.assertTrue(os.path.exists(release.transform_sales_path))
self.assertTrue(os.path.exists(release.transform_traffic_path))
self.assert_file_integrity(release.transform_sales_path, params["sales_transform_hash"], "gzip_crc")
self.assert_file_integrity(
release.transform_traffic_path, params["traffic_transform_hash"], "gzip_crc"
)
# Test that transformed file uploaded
self.assert_blob_integrity(
env.transform_bucket,
gcs_blob_name_from_path(release.transform_traffic_path),
release.transform_traffic_path,
)
self.assert_blob_integrity(
env.transform_bucket,
gcs_blob_name_from_path(release.transform_sales_path),
release.transform_sales_path,
)
# Test that data loaded into BigQuery
table_id = bq_table_id(
telescope.cloud_workspace.project_id,
telescope.sales_partner.bq_dataset_id,
telescope.sales_partner.bq_table_name,
)
self.assert_table_integrity(table_id, params["bq_rows"])
table_id = bq_table_id(
telescope.cloud_workspace.project_id,
telescope.traffic_partner.bq_dataset_id,
telescope.traffic_partner.bq_table_name,
)
self.assert_table_integrity(table_id, params["bq_rows"])
# Test move files to finished
ti = env.run_task(telescope.move_files_to_finished.__name__)
self.assertEqual(ti.state, State.SUCCESS)
for file in release.sftp_files:
file_name = os.path.basename(file)
in_progress_file = os.path.join(local_sftp_folders.in_progress, file_name)
self.assertFalse(os.path.isfile(in_progress_file))
finished_file = os.path.join(local_sftp_folders.finished, file_name)
self.assertTrue(os.path.isfile(finished_file))
# Add_dataset_release_task
dataset_releases = get_dataset_releases(
dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id
)
self.assertEqual(len(dataset_releases), 0)
ti = env.run_task(telescope.add_new_dataset_releases.__name__)
self.assertEqual(ti.state, State.SUCCESS)
dataset_releases = get_dataset_releases(
dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id
)
self.assertEqual(len(dataset_releases), 1)
# Test cleanup
ti = env.run_task(telescope.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assert_cleanup(release.workflow_folder)
@patch("observatory.platform.airflow.Variable.get")