# Copyright 2020-2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Aniek Roelofs
import json
import os
from unittest.mock import ANY, MagicMock, patch
from urllib.parse import quote
from requests import Response
import httplib2
import httpretty
import pendulum
from airflow.models import Connection
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.utils.state import State
from click.testing import CliRunner
from googleapiclient.discovery import build
from googleapiclient.http import RequestMockBuilder
from oaebu_workflows.config import test_fixtures_folder
from oaebu_workflows.oaebu_partners import partner_from_str
from oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope import (
IrusOapenRelease,
IrusOapenTelescope,
call_cloud_function,
cloud_function_exists,
create_cloud_function,
upload_source_code_to_bucket,
)
from observatory.platform.api import get_dataset_releases
from observatory.platform.observatory_config import CloudWorkspace, Workflow
from observatory.platform.gcs import gcs_blob_name_from_path, gcs_upload_file
from observatory.platform.bigquery import bq_table_id
from observatory.platform.observatory_environment import (
ObservatoryEnvironment,
ObservatoryTestCase,
find_free_port,
random_id,
)
[docs]class TestIrusOapenTelescope(ObservatoryTestCase):
"""Tests for the Oapen Irus Uk telescope"""
def __init__(self, *args, **kwargs):
"""Constructor which sets up variables used by tests.
:param args: arguments.
:param kwargs: keyword arguments.
"""
super(TestIrusOapenTelescope, self).__init__(*args, **kwargs)
self.project_id = os.getenv("TEST_GCP_PROJECT_ID")
self.data_location = os.getenv("TEST_GCP_DATA_LOCATION")
self.publisher_name_v4 = (quote("UCL Press"),)
self.publisher_uuid_v5 = ("df73bf94-b818-494c-a8dd-6775b0573bc2",)
self.download_path = os.path.join(
test_fixtures_folder(workflow_module="irus_oapen_telescope"), "download.jsonl.gz"
)
[docs] def test_dag_structure(self):
"""Test that the Oapen Irus Uk DAG has the correct structure."""
dag = IrusOapenTelescope(
dag_id="irus_oapen_test_dag",
cloud_workspace=self.fake_cloud_workspace,
publisher_name_v4=self.publisher_name_v4,
publisher_uuid_v5=self.publisher_uuid_v5,
).make_dag()
self.assert_dag_structure(
{
"check_dependencies": ["create_cloud_function"],
"create_cloud_function": ["call_cloud_function"],
"call_cloud_function": ["transfer"],
"transfer": ["download_transform"],
"download_transform": ["upload_transformed"],
"upload_transformed": ["bq_load"],
"bq_load": ["add_new_dataset_releases"],
"add_new_dataset_releases": ["cleanup"],
"cleanup": [],
},
dag,
)
[docs] def test_dag_load(self):
"""Test that the Oapen Irus Uk DAG can be loaded from a DAG bag."""
env = ObservatoryEnvironment(
workflows=[
Workflow(
dag_id="irus_oapen_test",
name="My Oapen Irus UK Workflow",
class_name="oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.IrusOapenTelescope",
cloud_workspace=self.fake_cloud_workspace,
kwargs=dict(publisher_name_v4=self.publisher_name_v4, publisher_uuid_v5=self.publisher_uuid_v5),
)
],
)
with env.create():
self.assert_dag_load_from_config("irus_oapen_test")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.build")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.ServiceAccountCredentials")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.AuthorizedSession.post")
[docs] def test_telescope(self, mock_authorized_session, mock_account_credentials, mock_build):
"""Test the Oapen Irus Uk telescope end to end."""
# Setup Observatory environment
env = ObservatoryEnvironment(
self.project_id, self.data_location, api_host="localhost", api_port=find_free_port()
)
# Setup Telescope
execution_date = pendulum.datetime(year=2021, month=2, day=14)
patner = partner_from_str("irus_oapen")
patner.bq_dataset_id = env.add_dataset()
telescope = IrusOapenTelescope(
dag_id="irus_oapen_test",
cloud_workspace=env.cloud_workspace,
publisher_name_v4=self.publisher_name_v4,
publisher_uuid_v5=self.publisher_uuid_v5,
data_partner=patner,
)
# Fake oapen project and bucket
IrusOapenTelescope.OAPEN_PROJECT_ID = env.project_id
IrusOapenTelescope.OAPEN_BUCKET = random_id()
# Mock the Google Cloud Functions API service
mock_account_credentials.from_json_keyfile_dict.return_value = ""
request_builder = RequestMockBuilder(
{
"cloudfunctions.projects.locations.functions.get": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/functions/function-2",
"serviceConfig": {"uri": "https://oapen-access-stats-kkinbzaigla-ew.a.run.app"},
}
),
),
"cloudfunctions.projects.locations.functions.patch": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/operations/d29ya2Z",
"done": True,
"response": {"message": "response"},
}
),
),
},
check_unexpected=True,
)
mock_build.return_value = build(
"cloudfunctions",
"v2beta",
cache_discovery=False,
static_discovery=False,
requestBuilder=request_builder,
)
dag = telescope.make_dag()
# Create the Observatory environment and run tests
with env.create(task_logging=True):
with env.create_dag_run(dag, execution_date):
# Use release to check results from tasks
# release = IrusOapenRelease(
# dag_id=telescope.dag_id, run_id=env.dag_run.run_id, partition_date=execution_date.end_of("month")
# )
release = telescope.make_release(
run_id=env.dag_run.run_id,
data_interval_start=pendulum.parse(str(env.dag_run.data_interval_start)),
data_interval_end=pendulum.parse(str(env.dag_run.data_interval_end)),
)[0]
# Add airflow connections
conn = Connection(conn_id=telescope.geoip_license_conn_id, uri="http://email_address:password@")
env.add_connection(conn)
conn = Connection(conn_id=telescope.irus_oapen_api_conn_id, uri="mysql://requestor_id:api_key@")
env.add_connection(conn)
conn = Connection(conn_id=telescope.irus_oapen_login_conn_id, uri="mysql://user_id:license_key@")
env.add_connection(conn)
# Test that all dependencies are specified: no error should be thrown
ti = env.run_task(telescope.check_dependencies.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# Test create cloud function task: no error should be thrown
ti = env.run_task(telescope.create_cloud_function.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# Test call cloud function task: no error should be thrown
with httpretty.enabled():
# mock response of getting publisher uuid
url = "https://library.oapen.org/rest/search?query=publisher.name:ucl_press&expand=metadata"
httpretty.register_uri(httpretty.GET, url, body='[{"uuid":"df73bf94-b818-494c-a8dd-6775b0573bc2"}]')
# mock response of cloud function
mock_authorized_session.return_value = MagicMock(
spec=Response,
status_code=200,
json=lambda: {"entries": 100, "unprocessed_publishers": None},
reason="unit test",
)
url = "https://oapen-access-stats-kkinbzaigla-ew.a.run.app"
httpretty.register_uri(httpretty.POST, url, body="")
ti = env.run_task(telescope.call_cloud_function.__name__)
self.assertEqual(ti.state, State.SUCCESS)
# Test transfer task
gcs_upload_file(
bucket_name=IrusOapenTelescope.OAPEN_BUCKET,
blob_name=release.blob_name,
file_path=self.download_path,
)
ti = env.run_task(telescope.transfer.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assert_blob_integrity(env.download_bucket, release.blob_name, self.download_path)
# Test download_transform task
ti = env.run_task(telescope.download_transform.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assertTrue(os.path.exists(release.transform_path))
self.assert_file_integrity(release.transform_path, "0b111b2f", "gzip_crc")
# Test that transformed file uploaded
ti = env.run_task(telescope.upload_transformed.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assert_blob_integrity(
env.transform_bucket, gcs_blob_name_from_path(release.transform_path), release.transform_path
)
# Test that data loaded into BigQuery
ti = env.run_task(telescope.bq_load.__name__)
self.assertEqual(ti.state, State.SUCCESS)
table_id = bq_table_id(
project_id=telescope.cloud_workspace.project_id,
dataset_id=telescope.data_partner.bq_dataset_id,
table_id=telescope.data_partner.bq_table_name,
)
self.assert_table_integrity(table_id, 2)
# Delete oapen bucket
env._delete_bucket(IrusOapenTelescope.OAPEN_BUCKET)
# Add_dataset_release_task
dataset_releases = get_dataset_releases(dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id)
self.assertEqual(len(dataset_releases), 0)
ti = env.run_task(telescope.add_new_dataset_releases.__name__)
self.assertEqual(ti.state, State.SUCCESS)
dataset_releases = get_dataset_releases(dag_id=telescope.dag_id, dataset_id=telescope.api_dataset_id)
self.assertEqual(len(dataset_releases), 1)
# Test that all telescope data deleted
ti = env.run_task(telescope.cleanup.__name__)
self.assertEqual(ti.state, State.SUCCESS)
self.assert_cleanup(release.workflow_folder)
@patch("observatory.platform.airflow.Variable.get")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.upload_source_code_to_bucket")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.cloud_function_exists")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.create_cloud_function")
[docs] def test_create_cloud_function(self, mock_create_function, mock_function_exists, mock_upload, mock_variable_get):
"""Test the create_cloud_function method of the IrusOapenRelease
:param mock_variable_get: Mock Airflow Variable 'data'
"""
def reset_mocks():
mock_upload.reset_mock()
mock_function_exists.reset_mock()
mock_create_function.reset_mock()
def assert_mocks(create: bool, update: bool):
mock_upload.assert_called_once_with(
telescope.FUNCTION_SOURCE_URL,
telescope.OAPEN_PROJECT_ID,
telescope.OAPEN_BUCKET,
telescope.FUNCTION_BLOB_NAME,
release.cloud_function_path,
)
mock_function_exists.assert_called_once_with(ANY, full_name)
if create or update:
mock_create_function.assert_called_once_with(
ANY,
location,
full_name,
telescope.OAPEN_BUCKET,
telescope.FUNCTION_BLOB_NAME,
telescope.max_active_runs,
update,
)
else:
mock_create_function.assert_not_called()
with CliRunner().isolated_filesystem():
mock_variable_get.return_value = os.path.join(os.getcwd(), "data")
# Create release instance
cloud_workspace = CloudWorkspace(
project_id="test-project",
download_bucket="download_bucket",
transform_bucket="transform_bucket",
data_location="us",
)
telescope = IrusOapenTelescope(
dag_id="irus_oapen_test",
cloud_workspace=cloud_workspace,
publisher_name_v4="publisher",
publisher_uuid_v5="publisherUUID",
bq_dataset_id="dataset_id",
)
release = IrusOapenRelease(
dag_id=telescope.dag_id, run_id=random_id(), partition_date=pendulum.parse("2020-02-01")
)
location = f"projects/{telescope.OAPEN_PROJECT_ID}/locations/{telescope.FUNCTION_REGION}"
full_name = f"{location}/functions/{telescope.FUNCTION_NAME}"
# Test when source code upload was unsuccessful
mock_upload.return_value = False, False
task_instance = MagicMock()
# context = dict(ti=task_instance)
with self.assertRaises(AirflowException):
telescope.create_cloud_function(releases=[release], ti=task_instance)
# Test when cloud function does not exist
reset_mocks()
mock_upload.return_value = True, True
mock_function_exists.return_value = False
mock_create_function.return_value = True, "response"
telescope.create_cloud_function(releases=[release], ti=task_instance)
assert_mocks(create=True, update=False)
# Test when cloud function exists, but source code has changed
reset_mocks()
mock_upload.return_value = True, True
mock_function_exists.return_value = True
mock_create_function.return_value = True, "response"
telescope.create_cloud_function(telescope.max_active_runs)
assert_mocks(create=False, update=True)
# Test when cloud function exists and source code has not changed
reset_mocks()
mock_upload.return_value = True, False
mock_function_exists.return_value = True
telescope.create_cloud_function(releases=[release], ti=task_instance)
assert_mocks(create=False, update=False)
# Test when create cloud function was unsuccessful
reset_mocks()
mock_upload.return_value = True, True
mock_function_exists.return_value = True
mock_create_function.return_value = False, "response"
with self.assertRaises(AirflowException):
telescope.create_cloud_function(releases=[release], ti=task_instance)
@patch("observatory.platform.airflow.Variable.get")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.BaseHook.get_connection")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.call_cloud_function")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.cloud_function_exists")
[docs] def test_call_cloud_function(self, mock_function_exists, mock_call_function, mock_conn_get, mock_variable_get):
"""Test the call_cloud_function method of the IrusOapenRelease
:param mock_variable_get: Mock Airflow Variable 'data'
"""
connections = {
"geoip_license_key": Connection("geoip_license_key", uri="http://user_id:key@"),
"irus_oapen_api": Connection("irus_oapen_api", uri="http://requestor_id:api_key@"),
"irus_oapen_login": Connection("irus_oapen_login", uri="http://email:password@"),
}
mock_conn_get.side_effect = lambda x: connections[x]
# Set URI to function url
function_url = "https://oapen-access-stats-kkinbzfjal-ew.a.run.app"
mock_function_exists.return_value = function_url
with CliRunner().isolated_filesystem():
mock_variable_get.return_value = os.path.join(os.getcwd(), "data")
cloud_workspace = CloudWorkspace(
project_id=self.project_id,
download_bucket="download_bucket",
transform_bucket="transform_bucket",
data_location="us",
)
# Test new platform and old platform
for date in ["2020-03", "2020-04"]:
# Test for a given publisher name and the 'oapen' publisher
for publisher in [("publisher", "uuid1"), ("oapen", "uuid2")]:
mock_call_function.reset_mock()
telescope = IrusOapenTelescope(
dag_id="irus_oapen_test",
cloud_workspace=cloud_workspace,
publisher_name_v4=publisher[0],
publisher_uuid_v5=publisher[1],
bq_dataset_id="dataset_id",
)
release = IrusOapenRelease(
dag_id=telescope.dag_id, run_id=random_id(), partition_date=pendulum.parse(date + "-01")
)
telescope.call_cloud_function(releases=[release])
# Test that the call function is called with the correct args
if date == "2020-04":
username = "requestor_id"
password = "api_key"
else:
username = "email"
password = "password"
mock_call_function.assert_called_once_with(
function_url,
date,
username,
password,
"key",
telescope.publisher_name_v4,
telescope.publisher_uuid_v5,
telescope.OAPEN_BUCKET,
release.blob_name,
)
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.gcs_upload_file")
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.gcs_create_bucket")
[docs] def test_upload_source_code_to_bucket(self, mock_create_bucket, mock_upload_to_bucket):
"""Test getting source code from oapen irus uk release and uploading to storage bucket.
Test expected results both when md5 hashes match and when they don't."""
mock_create_bucket.return_value = True
mock_upload_to_bucket.return_value = True, True
with CliRunner().isolated_filesystem():
cloud_function_path = os.path.join(os.getcwd(), "cloud_function.zip")
success, upload = upload_source_code_to_bucket(
IrusOapenTelescope.FUNCTION_SOURCE_URL,
IrusOapenTelescope.OAPEN_PROJECT_ID,
IrusOapenTelescope.OAPEN_BUCKET,
IrusOapenTelescope.FUNCTION_BLOB_NAME,
cloud_function_path,
)
self.assertEqual(success, True)
self.assertEqual(upload, True)
IrusOapenTelescope.FUNCTION_MD5_HASH = "different"
with self.assertRaises(AirflowException):
upload_source_code_to_bucket(
IrusOapenTelescope.FUNCTION_SOURCE_URL,
IrusOapenTelescope.OAPEN_PROJECT_ID,
IrusOapenTelescope.OAPEN_BUCKET,
IrusOapenTelescope.FUNCTION_BLOB_NAME,
cloud_function_path,
)
[docs] def test_cloud_function_exists(self):
"""Test the function that checks whether the cloud function exists"""
requests = [
# Cloud function exists
{
"response": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/functions/function-2",
"serviceConfig": {"uri": "https://oapen-access-stats-kkinbzaigla-ew.a.run.app"},
}
),
),
"uri": "https://oapen-access-stats-kkinbzaigla-ew.a.run.app",
},
# Cloud function does not exist
{"response": (httplib2.Response({"status": 404, "reason": "HttpError"}), b"{}"), "uri": None},
]
for request in requests:
with self.subTest(request=request):
request_builder = RequestMockBuilder(
{
"cloudfunctions.projects.locations.functions.get": request["response"],
},
check_unexpected=True,
)
service = build(
"cloudfunctions",
"v2beta",
cache_discovery=False,
static_discovery=False,
requestBuilder=request_builder,
)
full_name = "projects/project-id/locations/us-central1/functions/function-2"
uri = cloud_function_exists(service, full_name=full_name)
self.assertEqual(request["uri"], uri)
def test_create_cloud_function(self):
"""Test the function that creates the cloud function"""
location = "projects/project-id/locations/us-central1"
full_name = "projects/project-id/locations/us-central1/functions/function-2"
source_bucket = "oapen_bucket"
blob_name = "source_code.zip"
max_active_runs = 1
requests = [
# Creating cloud function, no error
{
"method_id": "cloudfunctions.projects.locations.functions.create",
"response": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/operations/d29ya2Z",
"done": True,
"response": {"message": "response"},
}
),
),
"success": True,
"msg": {"message": "response"},
},
# Updating/patching cloud function, no error
{
"method_id": "cloudfunctions.projects.locations.functions.patch",
"response": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/operations/d29ya2Z",
"done": True,
"response": {"message": "response"},
}
),
),
"success": True,
"msg": {"message": "response"},
},
# Creating cloud function, error
{
"method_id": "cloudfunctions.projects.locations.functions.create",
"response": (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/operations/d29ya2Z",
"done": True,
"error": {"message": "error"},
}
),
),
"success": False,
"msg": {"message": "error"},
},
]
for request in requests:
with self.subTest(request=request):
request_builder = RequestMockBuilder(
{
request["method_id"]: (
None,
json.dumps(
{
"name": "projects/project-id/locations/us-central1/operations/d29ya2Z",
"done": False,
}
),
),
"cloudfunctions.projects.locations.operations.get": request["response"],
},
check_unexpected=True,
)
service = build(
"cloudfunctions",
"v2beta",
cache_discovery=False,
static_discovery=False,
requestBuilder=request_builder,
)
update = request["method_id"] == "cloudfunctions.projects.locations.functions.patch"
success, msg = create_cloud_function(
service, location, full_name, source_bucket, blob_name, max_active_runs, update=update
)
self.assertEqual(request["success"], success)
self.assertDictEqual(request["msg"], msg)
@patch("oaebu_workflows.irus_oapen_telescope.irus_oapen_telescope.AuthorizedSession.post")
def test_call_cloud_function(self, mock_authorized_session):
"""Test the function that calls the cloud function"""
function_url = "function_url"
release_date = "2020-01-01"
username = "username"
password = "password"
geoip_license_key = "key"
publisher_name = "publisher_name"
publisher_uuid = "publisher_uuid"
bucket_name = "bucket"
blob_name = "blob"
# Set responses for consequential calls
mock_authorized_session.side_effect = [
MagicMock(
spec=Response,
status_code=200,
reason="unit test",
json=lambda: {"entries": 100, "unprocessed_publishers": ["publisher1", "publisher2"]},
),
MagicMock(
spec=Response,
status_code=200,
reason="unit test",
json=lambda: {"entries": 200, "unprocessed_publishers": None},
),
MagicMock(
spec=Response,
status_code=200,
reason="unit test",
json=lambda: {"entries": 0, "unprocessed_publishers": None},
),
MagicMock(spec=Response, status_code=400, reason="unit test"),
]
# Test when there are unprocessed publishers (first 2 responses from side effect)
call_cloud_function(
function_url,
release_date,
username,
password,
geoip_license_key,
publisher_name,
publisher_uuid,
bucket_name,
blob_name,
)
self.assertEqual(2, mock_authorized_session.call_count)
# Test when entries is 0 (3rd response from side effect)
with self.assertRaises(AirflowSkipException):
call_cloud_function(
function_url,
release_date,
username,
password,
geoip_license_key,
publisher_name,
publisher_uuid,
bucket_name,
blob_name,
)
# Test when response status code is not 200 (last response from side effect)
with self.assertRaises(AirflowException):
call_cloud_function(
function_url,
release_date,
username,
password,
geoip_license_key,
publisher_name,
publisher_uuid,
bucket_name,
blob_name,
)