Source code for oaebu_workflows.google_analytics3_telescope.tests.test_google_analytics3_telescope

# Copyright 2020-2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: Aniek Roelofs, Keegan Smith

import gzip
import json
import os
from unittest.mock import patch

import pendulum
from airflow.models import Connection
from airflow.utils.state import State
from googleapiclient.discovery import build
from googleapiclient.http import HttpMockSequence

from oaebu_workflows.google_analytics3_telescope.google_analytics3_telescope import GoogleAnalytics3Telescope
from oaebu_workflows.oaebu_partners import partner_from_str
from oaebu_workflows.config import test_fixtures_folder
from observatory.platform.api import get_dataset_releases
from observatory.platform.observatory_config import Workflow
from observatory.platform.bigquery import bq_table_id
from observatory.platform.observatory_environment import (
    ObservatoryEnvironment,
    ObservatoryTestCase,
    find_free_port,
    load_and_parse_json,
)


[docs]class TestGoogleAnalytics3Telescope(ObservatoryTestCase): """Tests for the Google Analytics telescope""" def __init__(self, *args, **kwargs): """Constructor which sets up variables used by tests. :param args: arguments. :param kwargs: keyword arguments. """ super(TestGoogleAnalytics3Telescope, self).__init__(*args, **kwargs) self.project_id = os.getenv("TEST_GCP_PROJECT_ID") self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") self.view_id = "11235141" self.pagepath_regex = r".*regex$" self.organisation_name = "UCL Press" fixtures_folder = test_fixtures_folder(workflow_module="google_analytics3_telescope") self.test_table = os.path.join(fixtures_folder, "test_table.json") self.test_table_anu = os.path.join(fixtures_folder, "test_table_anu.json")
[docs] def test_dag_structure(self): """Test that the Google Analytics DAG has the correct structure. :return: None """ cloud_workspace = self.fake_cloud_workspace dag = GoogleAnalytics3Telescope( dag_id="google_analytics_test", organisation_name="Organisation Name", cloud_workspace=cloud_workspace, view_id=self.view_id, pagepath_regex=self.pagepath_regex, ).make_dag() self.assert_dag_structure( { "check_dependencies": ["download_transform"], "download_transform": ["upload_transformed"], "upload_transformed": ["bq_load"], "bq_load": ["add_new_dataset_releases"], "add_new_dataset_releases": ["cleanup"], "cleanup": [], }, dag, )
[docs] def test_dag_load(self): """Test that the Google Analytics DAG can be loaded from a DAG bag.""" env = ObservatoryEnvironment( workflows=[ Workflow( dag_id="google_analytics3", name="My Google Analytics Workflow", class_name="oaebu_workflows.google_analytics3_telescope.google_analytics3_telescope.GoogleAnalytics3Telescope", cloud_workspace=self.fake_cloud_workspace, kwargs=dict(organisation_name="My Organisation", pagepath_regex="", view_id="123456"), ) ] ) with env.create(): self.assert_dag_load_from_config("google_analytics3") # Errors should be raised if kwargs dict not supplied env.workflows[0].kwargs = {} with env.create(): with self.assertRaises(AssertionError) as cm: self.assert_dag_load_from_config("google_analytics3") msg = cm.exception.args[0] self.assertTrue("missing 3 required positional arguments" in msg) self.assertTrue("organisation_name" in msg) self.assertTrue("pagepath_regex" in msg) self.assertTrue("view_id" in msg)
@patch("oaebu_workflows.google_analytics3_telescope.google_analytics3_telescope.build") @patch( "oaebu_workflows.google_analytics3_telescope.google_analytics3_telescope.ServiceAccountCredentials" )
[docs] def test_telescope(self, mock_account_credentials, mock_build): """Test the Google Analytics telescope end to end specifically for ANU Press, to test custom dimensions. :return: None. """ # Mock the Google Reporting Analytics API service mock_account_credentials.from_json_keyfile_dict.return_value = "" http = HttpMockSequence(create_http_mock_sequence(GoogleAnalytics3Telescope.ANU_ORG_NAME)) mock_build.return_value = build("analyticsreporting", "v4", http=http) # Setup Observatory environment env = ObservatoryEnvironment( self.project_id, self.data_location, api_host="localhost", api_port=find_free_port() ) # Setup Telescope execution_date = pendulum.datetime(year=2022, month=6, day=1) partner = partner_from_str("google_analytics3") partner.bq_dataset_id = env.add_dataset() telescope = GoogleAnalytics3Telescope( dag_id="google_analytics_test", organisation_name=GoogleAnalytics3Telescope.ANU_ORG_NAME, cloud_workspace=env.cloud_workspace, view_id=self.view_id, pagepath_regex=self.pagepath_regex, data_partner=partner, ) dag = telescope.make_dag() # Create the Observatory environment and run tests with env.create(): with env.create_dag_run(dag, execution_date): # Add OAEBU service account connection connection conn = Connection( conn_id="oaebu_service_account", uri=f"google-cloud-platform://?type=service_account&private_key_id=private_key_id" f"&private_key=private_key" f"&client_email=client_email" f"&client_id=client_id", ) env.add_connection(conn) # Test that all dependencies are specified: no error should be thrown ti = env.run_task(telescope.check_dependencies.__name__) self.assertEqual(ti.state, State.SUCCESS) # Test download_transform task ti = env.run_task(telescope.download_transform.__name__) self.assertEqual(ti.state, State.SUCCESS) # Test that transformed file uploaded ti = env.run_task(telescope.upload_transformed.__name__) self.assertEqual(ti.state, State.SUCCESS) # Test that data loaded into BigQuery ti = env.run_task(telescope.bq_load.__name__) self.assertEqual(ti.state, State.SUCCESS) # Use release to check tasks release = telescope.make_release( run_id=env.dag_run.run_id, data_interval_start=pendulum.parse(str(env.dag_run.data_interval_start)), data_interval_end=pendulum.parse(str(env.dag_run.data_interval_end)), )[0] # Test download_transform task self.assertTrue(os.path.exists(release.transform_path)) self.assertTrue(os.path.isfile(release.transform_path)) # Use frozenset to test results are as expected, many dict transformations re-order items in dict actual_list = [] with gzip.open(release.transform_path, "rb") as f: for line in f: actual_list.append(json.loads(line)) expected_list = [ { "url": "/base/path/151420", "title": "Anything public program drive north.", "start_date": "2022-06-01", "end_date": "2022-06-30", "average_time": 59.5, "unique_views": { "country": [{"name": "country 1", "value": 3}, {"name": "country 2", "value": 3}], "referrer": [{"name": "referrer 1", "value": 3}, {"name": "referrer 2", "value": 3}], "social_network": [ {"name": "social_network 1", "value": 3}, {"name": "social_network 2", "value": 3}, ], }, "page_views": { "country": [{"name": "country 1", "value": 4}, {"name": "country 2", "value": 4}], "referrer": [{"name": "referrer 1", "value": 4}, {"name": "referrer 2", "value": 4}], "social_network": [ {"name": "social_network 1", "value": 4}, {"name": "social_network 2", "value": 4}, ], }, "sessions": { "country": [{"name": "country 1", "value": 1}, {"name": "country 2", "value": 1}], "source": [{"name": "source 1", "value": 1}, {"name": "source 2", "value": 1}], }, "publication_id": "1234567890123", "publication_type": "book", "publication_imprint": "imprint", "publication_group": "group", "publication_whole_or_part": "whole", "publication_format": "PDF", "release_date": "2022-06-30", }, { "url": "/base/path/833557", "title": "Standard current never no.", "start_date": "2022-06-01", "end_date": "2022-06-30", "average_time": 49.6, "unique_views": {"country": [], "referrer": [], "social_network": []}, "page_views": {"country": [], "referrer": [], "social_network": []}, "sessions": {"country": [], "source": []}, "publication_id": "1234567891234", "publication_type": "book", "publication_imprint": "imprint", "publication_group": "(none)", "publication_whole_or_part": "part", "publication_format": "HTML", "release_date": "2022-06-30", }, { "url": "/base/path/833557?fbclid=123", "title": "Standard current never no.", "start_date": "2022-06-01", "end_date": "2022-06-30", "average_time": 38.8, "unique_views": { "country": [{"name": "country 2", "value": 2}], "referrer": [{"name": "referrer 2", "value": 2}], "social_network": [{"name": "social_network 2", "value": 2}], }, "page_views": { "country": [{"name": "country 2", "value": 4}], "referrer": [{"name": "referrer 2", "value": 4}], "social_network": [{"name": "social_network 2", "value": 4}], }, "sessions": {"country": [], "source": []}, "publication_id": "1234567891234", "publication_type": "book", "publication_imprint": "imprint", "publication_group": "(none)", "publication_whole_or_part": "part", "publication_format": "HTML", "release_date": "2022-06-30", }, ] self.assertEqual(3, len(actual_list)) self.assertEqual(frozenset(expected_list[0]), frozenset(actual_list[0])) self.assertEqual(frozenset(expected_list[1]), frozenset(actual_list[1])) self.assertEqual(frozenset(expected_list[2]), frozenset(actual_list[2])) # Test that data loaded into BigQuery table_id = bq_table_id( telescope.cloud_workspace.project_id, telescope.data_partner.bq_dataset_id, telescope.data_partner.bq_table_name, ) self.assert_table_integrity(table_id, expected_rows=3) self.assert_table_content( table_id, load_and_parse_json(self.test_table_anu, date_fields=["release_date", "start_date", "end_date"]), primary_key="url", ) # add_dataset_release_task dataset_releases = get_dataset_releases(dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id) self.assertEqual(len(dataset_releases), 0) ti = env.run_task(telescope.add_new_dataset_releases.__name__) self.assertEqual(ti.state, State.SUCCESS) dataset_releases = get_dataset_releases(dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id) self.assertEqual(len(dataset_releases), 1) # Test that all telescope data deleted ti = env.run_task(telescope.cleanup.__name__) self.assertEqual(ti.state, State.SUCCESS) self.assert_cleanup(release.workflow_folder)
[docs]def create_http_mock_sequence(organisation_name: str) -> list: """Create a list of http mock sequences for listing books and getting dimension data :param organisation_name: The organisation name (add custom dimensions for ANU) :return: A list with HttpMockSequence instances """ http_mock_sequence = [] list_books = { "reports": [ { "columnHeader": { "dimensions": ["ga:pagepath", "ga:pageTitle"], "metricHeader": {"metricHeaderEntries": [{"name": "ga:avgTimeOnPage", "type": "TIME"}]}, }, "data": { "rows": [ { "dimensions": ["/base/path/151420", "Anything public program drive north."], "metrics": [{"values": ["59.5"]}], }, { "dimensions": ["/base/path/833557", "Standard current never no."], "metrics": [{"values": ["49.6"]}], }, ], "totals": [{"values": ["109.1"]}], "rowCount": 2, "minimums": [{"values": ["49.6"]}], "maximums": [{"values": ["59.5"]}], "isDataGolden": True, }, "nextPageToken": "200", } ] } # Add custom dimensions from ANU Press if organisation_name == GoogleAnalytics3Telescope.ANU_ORG_NAME: list_books["reports"][0]["columnHeader"]["dimensions"] += [f"ga:dimension{(str(i))}" for i in range(1, 7)] list_books["reports"][0]["data"]["rows"][0]["dimensions"] += [ "1234567890123", "book", "imprint", "group", "whole", "PDF", ] list_books["reports"][0]["data"]["rows"][1]["dimensions"] += [ "1234567891234", "book", "imprint", "(none)", "part", "HTML", ] list_books_next_page = { "reports": [ { "columnHeader": { "dimensions": ["ga:pagepath", "ga:pageTitle"], "metricHeader": {"metricHeaderEntries": [{"name": "ga:avgTimeOnPage", "type": "TIME"}]}, }, "data": { "rows": [ { "dimensions": ["/base/path/833557?fbclid=123", "Standard current never no."], "metrics": [{"values": ["38.8"]}], } ], "totals": [{"values": ["38.8"]}], "rowCount": 1, "minimums": [{"values": ["38.8"]}], "maximums": [{"values": ["38.8"]}], "isDataGolden": True, }, } ] } # Add custom dimensions from ANU Press if organisation_name == GoogleAnalytics3Telescope.ANU_ORG_NAME: list_books_next_page["reports"][0]["columnHeader"]["dimensions"] += [ f"ga:dimension{(str(i))}" for i in range(1, 7) ] list_books_next_page["reports"][0]["data"]["rows"][0]["dimensions"] += [ "1234567891234", "book", "imprint", "(none)", "part", "HTML", ] http_mock_sequence.append(({"status": "200"}, json.dumps(list_books))) http_mock_sequence.append(({"status": "200"}, json.dumps(list_books_next_page))) for dimension in ["country", "referrer", "social_network", "source"]: results = { "reports": [ { "columnHeader": { "dimensions": ["ga:pagePath", "ga:country"], "metricHeader": { "metricHeaderEntries": [ {"name": "ga:uniquePageviews", "type": "INTEGER"}, {"name": "ga:Pageviews", "type": "INTEGER"}, {"name": "ga:sessions", "type": "INTEGER"}, ] }, }, "data": { "rows": [ { "dimensions": ["/base/path/151420", dimension + " 1"], "metrics": [{"values": ["3", "4", "1"]}], }, { "dimensions": ["/base/path/151420", dimension + " 2"], "metrics": [{"values": ["3", "4", "1"]}], }, { "dimensions": ["/base/path/833557", dimension + " 1"], "metrics": [{"values": ["0", "0", "0"]}], # Added a zero case for code coverage }, { "dimensions": ["/base/path/833557?fbclid=123", dimension + " 2"], "metrics": [{"values": ["2", "4", "0"]}], }, ], "totals": [{"values": ["6", "9", "1"]}], "rowCount": 3, "minimums": [{"values": ["1", "3", "0"]}], "maximums": [{"values": ["3", "4", "1"]}], "isDataGolden": True, }, } ] } http_mock_sequence.append(({"status": "200"}, json.dumps(results))) return http_mock_sequence