import os import shutil import time from functools import wraps from tempfile import ( mkdtemp, mkstemp, ) from unittest.mock import patch from uuid import uuid4 import pytest from requests import get from galaxy.exceptions import ObjectInvalid from galaxy.objectstore import persist_extra_files_for_dataset from galaxy.objectstore.azure_blob import AzureBlobObjectStore from galaxy.objectstore.caching import ( CacheTarget, check_cache, InProcessCacheMonitor, reset_cache, ) from galaxy.objectstore.cloud import Cloud from galaxy.objectstore.examples import get_example from galaxy.objectstore.pithos import PithosObjectStore from galaxy.objectstore.s3 import S3ObjectStore from galaxy.objectstore.s3_boto3 import S3ObjectStore as Boto3ObjectStore from galaxy.objectstore.unittest_utils import ( Config as TestConfig, DISK_TEST_CONFIG, DISK_TEST_CONFIG_YAML, ) from galaxy.util import ( directory_hash_id, unlink, ) from galaxy.util.unittest_utils import skip_unless_environ # Unit testing the cloud and advanced infrastructure object stores is difficult, but # we can at least stub out initializing and test the configuration of these things from # XML and dicts. class UninitializedPithosObjectStore(PithosObjectStore): def _initialize(self): pass class UninitializedS3ObjectStore(S3ObjectStore): def _initialize(self): pass class UninitializedBoto3ObjectStore(Boto3ObjectStore): def _initialize(self): pass class UninitializedAzureBlobObjectStore(AzureBlobObjectStore): def _initialize(self): pass class UninitializedCloudObjectStore(Cloud): def _initialize(self): pass def patch_object_stores_to_skip_initialize(f): @wraps(f) @patch("galaxy.objectstore.s3.S3ObjectStore", UninitializedS3ObjectStore) @patch("galaxy.objectstore.s3_boto3.S3ObjectStore", UninitializedBoto3ObjectStore) @patch("galaxy.objectstore.pithos.PithosObjectStore", UninitializedPithosObjectStore) @patch("galaxy.objectstore.cloud.Cloud", UninitializedCloudObjectStore) @patch("galaxy.objectstore.azure_blob.AzureBlobObjectStore", UninitializedAzureBlobObjectStore) def wrapper(*args, **kwd): f(*args, **kwd) return wrapper def test_unlink_path(): with pytest.raises(FileNotFoundError): unlink(uuid4().hex) unlink(uuid4().hex, ignore_errors=True) fd, path = mkstemp() os.close(fd) assert os.path.exists(path) unlink(path) assert not os.path.exists(path) def test_disk_store(): for config_str in [DISK_TEST_CONFIG, DISK_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): # Test no dataset with id 1 exists. absent_dataset = MockDataset(1) assert not object_store.exists(absent_dataset) # Write empty dataset 2 in second backend, ensure it is empty and # exists. empty_dataset = MockDataset(2) directory.write("", "files1/000/dataset_2.dat") assert object_store.exists(empty_dataset) assert object_store.empty(empty_dataset) # Write non-empty dataset in backend 1, test it is not emtpy & exists. hello_world_dataset = MockDataset(3) directory.write("Hello World!", "files1/000/dataset_3.dat") assert object_store.exists(hello_world_dataset) assert not object_store.empty(hello_world_dataset) # Test get_data data = object_store.get_data(hello_world_dataset) assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) assert data == "ello W" # Test Size # Test absent and empty datasets yield size of 0. assert object_store.size(absent_dataset) == 0 assert object_store.size(empty_dataset) == 0 # Elsewise assert object_store.size(hello_world_dataset) > 0 # Should this always be the number of bytes? # Test percent used (to some degree) percent_store_used = object_store.get_store_usage_percent() assert percent_store_used > 0.0 assert percent_store_used < 100.0 # Test update_from_file test output_dataset = MockDataset(4) output_real_path = os.path.join(directory.temp_directory, "files1", "000", "dataset_4.dat") assert not os.path.exists(output_real_path) output_working_path = directory.write("NEW CONTENTS", "job_working_directory1/example_output") object_store.update_from_file(output_dataset, file_name=output_working_path, create=True) assert os.path.exists(output_real_path) # Test delete to_delete_dataset = MockDataset(5) to_delete_real_path = directory.write("content to be deleted!", "files1/000/dataset_5.dat") assert object_store.exists(to_delete_dataset) assert object_store.delete(to_delete_dataset) assert not object_store.exists(to_delete_dataset) assert not os.path.exists(to_delete_real_path) DISK_TEST_CONFIG_BY_UUID_YAML = """ type: disk files_dir: "${temp_directory}/files1" store_by: uuid extra_dirs: - type: temp path: "${temp_directory}/tmp1" - type: job_work path: "${temp_directory}/job_working_directory1" """ def test_disk_store_by_uuid(): for config_str in [DISK_TEST_CONFIG_BY_UUID_YAML]: with TestConfig(config_str) as (directory, object_store): # Test no dataset with id 1 exists. absent_dataset = MockDataset(1) assert not object_store.exists(absent_dataset) # Write empty dataset 2 in second backend, ensure it is empty and # exists. empty_dataset = MockDataset(2) directory.write("", f"files1/{empty_dataset.rel_path_for_uuid_test()}/dataset_{empty_dataset.uuid}.dat") assert object_store.exists(empty_dataset) assert object_store.empty(empty_dataset) # Write non-empty dataset in backend 1, test it is not emtpy & exists. hello_world_dataset = MockDataset(3) directory.write( "Hello World!", f"files1/{hello_world_dataset.rel_path_for_uuid_test()}/dataset_{hello_world_dataset.uuid}.dat", ) assert object_store.exists(hello_world_dataset) assert not object_store.empty(hello_world_dataset) # Test get_data data = object_store.get_data(hello_world_dataset) assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) assert data == "ello W" # Test Size # Test absent and empty datasets yield size of 0. assert object_store.size(absent_dataset) == 0 assert object_store.size(empty_dataset) == 0 # Elsewise assert object_store.size(hello_world_dataset) > 0 # Should this always be the number of bytes? # Test percent used (to some degree) percent_store_used = object_store.get_store_usage_percent() assert percent_store_used > 0.0 assert percent_store_used < 100.0 # Test update_from_file test output_dataset = MockDataset(4) output_real_path = os.path.join( directory.temp_directory, "files1", output_dataset.rel_path_for_uuid_test(), f"dataset_{output_dataset.uuid}.dat", ) assert not os.path.exists(output_real_path) output_working_path = directory.write("NEW CONTENTS", "job_working_directory1/example_output") object_store.update_from_file(output_dataset, file_name=output_working_path, create=True) assert os.path.exists(output_real_path) # Test delete to_delete_dataset = MockDataset(5) to_delete_real_path = directory.write( "content to be deleted!", f"files1/{to_delete_dataset.rel_path_for_uuid_test()}/dataset_{to_delete_dataset.uuid}.dat", ) assert object_store.exists(to_delete_dataset) assert object_store.delete(to_delete_dataset) assert not object_store.exists(to_delete_dataset) assert not os.path.exists(to_delete_real_path) def test_disk_store_alt_name_relpath(): """Test that alt_name cannot be used to access arbitrary paths using a relative path """ with TestConfig(DISK_TEST_CONFIG) as (directory, object_store): empty_dataset = MockDataset(1) directory.write("", "files1/000/dataset_1.dat") directory.write("foo", "foo.txt") try: assert ( object_store.get_data(empty_dataset, extra_dir="dataset_1_files", alt_name="../../../foo.txt") != "foo" ) except ObjectInvalid: pass def test_disk_store_alt_name_abspath(): """Test that alt_name cannot be used to access arbitrary paths using a absolute path """ with TestConfig(DISK_TEST_CONFIG) as (directory, object_store): empty_dataset = MockDataset(1) directory.write("", "files1/000/dataset_1.dat") absfoo = os.path.abspath(os.path.join(directory.temp_directory, "foo.txt")) with open(absfoo, "w") as f: f.write("foo") try: assert object_store.get_data(empty_dataset, extra_dir="dataset_1_files", alt_name=absfoo) != "foo" except ObjectInvalid: pass HIERARCHICAL_TEST_CONFIG = get_example("hierarchical_simple.xml") HIERARCHICAL_TEST_CONFIG_YAML = get_example("hierarchical_simple.yml") def test_hierarchical_store(): for config_str in [HIERARCHICAL_TEST_CONFIG, HIERARCHICAL_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): # Test no dataset with id 1 exists. assert not object_store.exists(MockDataset(1)) # Write empty dataset 2 in second backend, ensure it is empty and # exists. directory.write("", "files2/000/dataset_2.dat") assert object_store.exists(MockDataset(2)) assert object_store.empty(MockDataset(2)) # Write non-empty dataset in backend 1, test it is not empty & exists. directory.write("Hello World!", "files1/000/dataset_3.dat") assert object_store.exists(MockDataset(3)) assert not object_store.empty(MockDataset(3)) # check and description routed correctly files1_desc = object_store.get_concrete_store_description_markdown(MockDataset(3)) files1_name = object_store.get_concrete_store_name(MockDataset(3)) files2_desc = object_store.get_concrete_store_description_markdown(MockDataset(2)) files2_name = object_store.get_concrete_store_name(MockDataset(2)) assert "fancy" in files1_desc assert "Newer Cool" in files1_name assert "older" in files2_desc assert "Legacy" in files2_name # Assert creation always happens in first backend. for i in range(100): dataset = MockDataset(100 + i) object_store.create(dataset) assert object_store.get_filename(dataset).find("files1") > 0 as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["backends", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "hierarchical") def test_concrete_name_without_objectstore_id(): for config_str in [HIERARCHICAL_TEST_CONFIG, HIERARCHICAL_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): files1_desc = object_store.get_concrete_store_description_markdown(MockDataset(3)) files1_name = object_store.get_concrete_store_name(MockDataset(3)) assert files1_desc is None assert files1_name is None MIXED_STORE_BY_DISTRIBUTED_TEST_CONFIG = """ """ MIXED_STORE_BY_HIERARCHICAL_TEST_CONFIG = """ """ def test_mixed_store_by(): with TestConfig(MIXED_STORE_BY_DISTRIBUTED_TEST_CONFIG) as (directory, object_store): as_dict = object_store.to_dict() assert as_dict["backends"][0]["store_by"] == "id" assert as_dict["backends"][1]["store_by"] == "uuid" with TestConfig(MIXED_STORE_BY_HIERARCHICAL_TEST_CONFIG) as (directory, object_store): as_dict = object_store.to_dict() assert as_dict["backends"][0]["store_by"] == "id" assert as_dict["backends"][1]["store_by"] == "uuid" def test_mixed_private(): # Distributed object store can combine private and non-private concrete objectstores with TestConfig(MIXED_STORE_BY_DISTRIBUTED_TEST_CONFIG) as (directory, object_store): ids = object_store.object_store_ids() assert len(ids) == 2 ids = object_store.object_store_ids(private=True) assert len(ids) == 1 assert ids[0] == "files2" ids = object_store.object_store_ids(private=False) assert len(ids) == 1 assert ids[0] == "files1" as_dict = object_store.to_dict() assert not as_dict["backends"][0]["private"] assert as_dict["backends"][1]["private"] with TestConfig(MIXED_STORE_BY_HIERARCHICAL_TEST_CONFIG) as (directory, object_store): as_dict = object_store.to_dict() assert as_dict["backends"][0]["private"] assert as_dict["backends"][1]["private"] assert object_store.private assert as_dict["private"] is True def test_empty_cache_targets_for_disk_nested_stores(): with TestConfig(MIXED_STORE_BY_DISTRIBUTED_TEST_CONFIG) as (directory, object_store): assert len(object_store.cache_targets()) == 0 with TestConfig(MIXED_STORE_BY_HIERARCHICAL_TEST_CONFIG) as (directory, object_store): assert len(object_store.cache_targets()) == 0 BADGES_TEST_1_CONFIG_XML = get_example("disk_badges.xml") BADGES_TEST_1_CONFIG_YAML = get_example("disk_badges.yml") def test_badges_parsing(): for config_str in [BADGES_TEST_1_CONFIG_XML, BADGES_TEST_1_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): badges = object_store.to_dict()["badges"] assert len(badges) == 6 badge_1 = badges[0] assert badge_1["type"] == "short_term" assert badge_1["message"] is None badge_2 = badges[1] assert badge_2["type"] == "faster" assert badge_2["message"] == "Fast interconnects." badge_3 = badges[2] assert badge_3["type"] == "less_stable" assert badge_3["message"] is None badge_4 = badges[3] assert badge_4["type"] == "more_secure" assert badge_4["message"] is None BADGES_TEST_CONFLICTS_1_CONFIG_YAML = """ type: disk files_dir: "${temp_directory}/files1" badges: - type: slower - type: faster """ BADGES_TEST_CONFLICTS_2_CONFIG_YAML = """ type: disk files_dir: "${temp_directory}/files1" badges: - type: more_secure - type: less_secure """ def test_badges_parsing_conflicts(): for config_str in [BADGES_TEST_CONFLICTS_1_CONFIG_YAML]: exception_raised = False try: with TestConfig(config_str) as (directory, object_store): pass except Exception as e: assert "faster" in str(e) assert "slower" in str(e) exception_raised = True assert exception_raised for config_str in [BADGES_TEST_CONFLICTS_2_CONFIG_YAML]: exception_raised = False try: with TestConfig(config_str) as (directory, object_store): pass except Exception as e: assert "more_secure" in str(e) assert "less_secure" in str(e) exception_raised = True assert exception_raised DISTRIBUTED_TEST_CONFIG = get_example("distributed_disk.xml") DISTRIBUTED_TEST_CONFIG_YAML = get_example("distributed_disk.yml") def test_distributed_store(): for config_str in [DISTRIBUTED_TEST_CONFIG, DISTRIBUTED_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): persisted_ids = [] for i in range(100): dataset = MockDataset(100 + i) object_store.create(dataset) persisted_ids.append(dataset.object_store_id) # Test distributes datasets between backends according to weights backend_1_count = sum(1 for v in persisted_ids if v == "files1") backend_2_count = sum(1 for v in persisted_ids if v == "files2") assert backend_1_count > 0 assert backend_2_count > 0 assert backend_1_count > backend_2_count as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["backends", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "distributed") backends = as_dict["backends"] assert len(backends) assert backends[0]["quota"]["source"] == "1files" assert backends[1]["quota"]["source"] == "2files" extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 device_source_map = object_store.get_device_source_map() assert device_source_map assert device_source_map.get_device_id("files1") == "primary_disk" assert device_source_map.get_device_id("files2") == "primary_disk" def test_distributed_store_empty_cache_targets(): for config_str in [DISTRIBUTED_TEST_CONFIG, DISTRIBUTED_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): assert len(object_store.cache_targets()) == 0 @patch_object_stores_to_skip_initialize def test_distributed_store_with_cache_targets(): for config_str in [get_example("distributed_s3.yml")]: with TestConfig(config_str) as (_, object_store): assert len(object_store.cache_targets()) == 2 HIERARCHICAL_MUST_HAVE_UNIFIED_QUOTA_SOURCE = """ """ def test_hiercachical_backend_must_share_quota_source(): the_exception = None for config_str in [HIERARCHICAL_MUST_HAVE_UNIFIED_QUOTA_SOURCE]: try: with TestConfig(config_str) as (directory, object_store): pass except Exception as e: the_exception = e assert the_exception is not None PITHOS_TEST_CONFIG = get_example("pithos_simple.xml") PITHOS_TEST_CONFIG_YAML = get_example("pithos_simple.yml") @patch_object_stores_to_skip_initialize def test_config_parse_pithos(): for config_str in [PITHOS_TEST_CONFIG, PITHOS_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): configured_config_dict = object_store.config_dict _assert_has_keys(configured_config_dict, ["auth", "container", "extra_dirs"]) auth_dict = configured_config_dict["auth"] _assert_key_has_value(auth_dict, "url", "http://example.org/") _assert_key_has_value(auth_dict, "token", "extoken123") container_dict = configured_config_dict["container"] _assert_key_has_value(container_dict, "name", "foo") _assert_key_has_value(container_dict, "project", "cow") assert object_store.extra_dirs["job_work"] == "database/working_pithos" assert object_store.extra_dirs["temp"] == "database/tmp_pithos" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["auth", "container", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "pithos") auth_dict = as_dict["auth"] _assert_key_has_value(auth_dict, "url", "http://example.org/") _assert_key_has_value(auth_dict, "token", "extoken123") container_dict = as_dict["container"] _assert_key_has_value(container_dict, "name", "foo") _assert_key_has_value(container_dict, "project", "cow") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 S3_TEST_CONFIG = get_example("s3_simple.xml") S3_TEST_CONFIG_YAML = get_example("s3_simple.yml") @patch_object_stores_to_skip_initialize def test_config_parse_s3(): for config_str in [S3_TEST_CONFIG, S3_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): assert object_store.private assert object_store.access_key == "access_moo" assert object_store.secret_key == "secret_cow" assert object_store.bucket == "unique_bucket_name_all_lowercase" assert object_store.use_rr is False assert object_store.host is None assert object_store.port == 6000 assert object_store.multipart is True assert object_store.is_secure is True assert object_store.conn_path == "/" cache_target = object_store.cache_target assert cache_target.size == 1000 assert cache_target.path == "database/object_store_cache" assert object_store.extra_dirs["job_work"] == "database/job_working_directory_s3" assert object_store.extra_dirs["temp"] == "database/tmp_s3" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["auth", "bucket", "connection", "cache", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "aws_s3") auth_dict = as_dict["auth"] bucket_dict = as_dict["bucket"] connection_dict = as_dict["connection"] cache_dict = as_dict["cache"] _assert_key_has_value(auth_dict, "access_key", "access_moo") _assert_key_has_value(auth_dict, "secret_key", "secret_cow") _assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase") _assert_key_has_value(bucket_dict, "use_reduced_redundancy", False) _assert_key_has_value(connection_dict, "host", None) _assert_key_has_value(connection_dict, "port", 6000) _assert_key_has_value(connection_dict, "multipart", True) _assert_key_has_value(connection_dict, "is_secure", True) _assert_key_has_value(cache_dict, "size", 1000) _assert_key_has_value(cache_dict, "path", "database/object_store_cache") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 S3_DEFAULT_CACHE_TEST_CONFIG = get_example("s3_global_cache.xml") S3_DEFAULT_CACHE_TEST_CONFIG_YAML = get_example("s3_global_cache.yml") @patch_object_stores_to_skip_initialize def test_config_parse_s3_with_default_cache(): for config_str in [S3_DEFAULT_CACHE_TEST_CONFIG, S3_DEFAULT_CACHE_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): assert object_store.cache_size == -1 assert object_store.staging_path == directory.global_config.object_store_cache_path @patch_object_stores_to_skip_initialize def test_config_parse_boto3(): for config_str in [get_example("boto3_simple.xml"), get_example("boto3_simple.yml")]: with TestConfig(config_str) as (directory, object_store): assert object_store.access_key == "access_moo" assert object_store.secret_key == "secret_cow" assert object_store.bucket == "unique_bucket_name_all_lowercase" # defaults to AWS assert object_store.endpoint_url is None cache_target = object_store.cache_target assert cache_target.size == 1000 assert cache_target.path == "database/object_store_cache" assert object_store.extra_dirs["job_work"] == "database/job_working_directory_s3" assert object_store.extra_dirs["temp"] == "database/tmp_s3" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["auth", "bucket", "connection", "cache", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "boto3") auth_dict = as_dict["auth"] bucket_dict = as_dict["bucket"] cache_dict = as_dict["cache"] _assert_key_has_value(auth_dict, "access_key", "access_moo") _assert_key_has_value(auth_dict, "secret_key", "secret_cow") _assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase") _assert_key_has_value(cache_dict, "size", 1000) _assert_key_has_value(cache_dict, "path", "database/object_store_cache") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 @patch_object_stores_to_skip_initialize def test_config_parse_boto3_custom_connection(): for config_str in [get_example("boto3_custom_connection.xml"), get_example("boto3_custom_connection.yml")]: with TestConfig(config_str) as (directory, object_store): assert object_store.endpoint_url == "https://s3.example.org/" assert object_store.region == "the_example_region" @patch_object_stores_to_skip_initialize def test_config_parse_boto3_merged_transfer_options(): for config_str in [ get_example("boto3_merged_transfer_options.xml"), get_example("boto3_merged_transfer_options.yml"), ]: with TestConfig(config_str) as (directory, object_store): as_dict = object_store.to_dict() transfer_dict = as_dict["transfer"] assert transfer_dict["multipart_threshold"] == 13 assert transfer_dict["max_concurrency"] == 13 assert transfer_dict["multipart_chunksize"] == 13 assert transfer_dict["num_download_attempts"] == 13 assert transfer_dict["max_io_queue"] == 13 assert transfer_dict["io_chunksize"] == 13 assert transfer_dict["use_threads"] is False assert transfer_dict["max_bandwidth"] == 13 for transfer_type in ["upload", "download"]: transfer_config = object_store._transfer_config(transfer_type) assert transfer_config.multipart_threshold == 13 assert transfer_config.max_concurrency == 13 assert transfer_config.multipart_chunksize == 13 assert transfer_config.num_download_attempts == 13 assert transfer_config.max_io_queue == 13 assert transfer_config.io_chunksize == 13 assert transfer_config.use_threads is False assert transfer_config.max_bandwidth == 13 @patch_object_stores_to_skip_initialize def test_config_parse_boto3_separated_transfer_options(): for config_str in [ get_example("boto3_separated_transfer_options.xml"), get_example("boto3_separated_transfer_options.yml"), ]: with TestConfig(config_str) as (directory, object_store): transfer_config = object_store._transfer_config("upload") assert transfer_config.multipart_threshold == 13 assert transfer_config.max_concurrency == 13 assert transfer_config.multipart_chunksize == 13 assert transfer_config.num_download_attempts == 13 assert transfer_config.max_io_queue == 13 assert transfer_config.io_chunksize == 13 assert transfer_config.use_threads is False assert transfer_config.max_bandwidth == 13 transfer_config = object_store._transfer_config("download") assert transfer_config.multipart_threshold == 14 assert transfer_config.max_concurrency == 14 assert transfer_config.multipart_chunksize == 14 assert transfer_config.num_download_attempts == 14 assert transfer_config.max_io_queue == 14 assert transfer_config.io_chunksize == 14 assert transfer_config.use_threads is True assert transfer_config.max_bandwidth == 14 CLOUD_AWS_TEST_CONFIG = get_example("cloud_aws_simple.xml") CLOUD_AWS_TEST_CONFIG_YAML = get_example("cloud_aws_simple.yml") CLOUD_AZURE_TEST_CONFIG = get_example("cloud_azure_simple.xml") CLOUD_AZURE_TEST_CONFIG_YAML = get_example("cloud_azure_simple.yml") CLOUD_GOOGLE_TEST_CONFIG = get_example("cloud_gcp_simple.xml") CLOUD_GOOGLE_TEST_CONFIG_YAML = get_example("cloud_gcp_simple.yml") @patch_object_stores_to_skip_initialize def test_config_parse_cloud(): for config_str in [ CLOUD_AWS_TEST_CONFIG, CLOUD_AWS_TEST_CONFIG_YAML, CLOUD_AZURE_TEST_CONFIG, CLOUD_AZURE_TEST_CONFIG_YAML, CLOUD_GOOGLE_TEST_CONFIG, CLOUD_GOOGLE_TEST_CONFIG_YAML, ]: if "google" in config_str: tmpdir = mkdtemp() if not os.path.exists(tmpdir): os.makedirs(tmpdir) path = os.path.join(tmpdir, "gcp.config") open(path, "w").write("some_gcp_config") config_str = config_str.replace("gcp.config", path) with TestConfig(config_str) as (directory, object_store): assert object_store.bucket_name == "unique_bucket_name_all_lowercase" assert object_store.use_rr is False cache_target = object_store.cache_target assert cache_target.size == 1000.0 assert cache_target.path == "database/object_store_cache" assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud" assert object_store.extra_dirs["temp"] == "database/tmp_cloud" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "cloud") auth_dict = as_dict["auth"] bucket_dict = as_dict["bucket"] cache_dict = as_dict["cache"] provider = as_dict["provider"] if provider == "aws": _assert_key_has_value(auth_dict, "access_key", "access_moo") _assert_key_has_value(auth_dict, "secret_key", "secret_cow") elif provider == "azure": _assert_key_has_value(auth_dict, "subscription_id", "a_sub_id") _assert_key_has_value(auth_dict, "client_id", "and_a_client_id") _assert_key_has_value(auth_dict, "secret", "and_a_secret_key") _assert_key_has_value(auth_dict, "tenant", "and_some_tenant_info") elif provider == "google": _assert_key_has_value(auth_dict, "credentials_file", path) _assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase") _assert_key_has_value(bucket_dict, "use_reduced_redundancy", False) _assert_key_has_value(cache_dict, "size", 1000.0) _assert_key_has_value(cache_dict, "path", "database/object_store_cache") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 CLOUD_AWS_NO_AUTH_TEST_CONFIG = get_example("cloud_aws_no_auth.xml") @patch_object_stores_to_skip_initialize def test_config_parse_cloud_noauth_for_aws(): for config_str in [CLOUD_AWS_NO_AUTH_TEST_CONFIG]: with TestConfig(config_str) as (directory, object_store): assert object_store.bucket_name == "unique_bucket_name_all_lowercase" assert object_store.use_rr is False cache_target = object_store.cache_target assert cache_target.size == 1000.0 assert cache_target.path == "database/object_store_cache" assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud" assert object_store.extra_dirs["temp"] == "database/tmp_cloud" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "cloud") auth_dict = as_dict["auth"] bucket_dict = as_dict["bucket"] cache_dict = as_dict["cache"] provider = as_dict["provider"] assert provider == "aws" _assert_key_has_value(auth_dict, "access_key", None) _assert_key_has_value(auth_dict, "secret_key", None) _assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase") _assert_key_has_value(bucket_dict, "use_reduced_redundancy", False) _assert_key_has_value(cache_dict, "size", 1000.0) _assert_key_has_value(cache_dict, "path", "database/object_store_cache") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 CLOUD_AWS_NO_CACHE_TEST_CONFIG = get_example("cloud_aws_default_cache.xml") @patch_object_stores_to_skip_initialize def test_config_parse_cloud_no_cache_for_aws(): for config_str in [CLOUD_AWS_NO_CACHE_TEST_CONFIG]: with TestConfig(config_str) as (directory, object_store): assert object_store.staging_path == directory.global_config.object_store_cache_path assert object_store.cache_size == -1 AZURE_BLOB_TEST_CONFIG = get_example("azure_simple.xml") AZURE_BLOB_TEST_CONFIG_YAML = get_example("azure_simple.yml") @patch_object_stores_to_skip_initialize def test_config_parse_azure(): for config_str in [AZURE_BLOB_TEST_CONFIG, AZURE_BLOB_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): assert object_store.account_name == "azureact" assert object_store.account_key == "password123" assert object_store.container_name == "unique_container_name" cache_target = object_store.cache_target assert cache_target.size == 100 assert cache_target.path == "database/object_store_cache" assert object_store.extra_dirs["job_work"] == "database/job_working_directory_azure" assert object_store.extra_dirs["temp"] == "database/tmp_azure" as_dict = object_store.to_dict() _assert_has_keys(as_dict, ["auth", "container", "cache", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "azure_blob") auth_dict = as_dict["auth"] container_dict = as_dict["container"] cache_dict = as_dict["cache"] _assert_key_has_value(auth_dict, "account_name", "azureact") _assert_key_has_value(auth_dict, "account_key", "password123") _assert_key_has_value(container_dict, "name", "unique_container_name") _assert_key_has_value(cache_dict, "size", 100) _assert_key_has_value(cache_dict, "path", "database/object_store_cache") extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 @patch_object_stores_to_skip_initialize def test_config_parse_azure_transfer(): for config_str in [get_example("azure_transfer.xml"), get_example("azure_transfer.yml")]: with TestConfig(config_str) as (directory, object_store): as_dict = object_store.to_dict()["transfer"] assert as_dict["download_max_concurrency"] == 1 assert as_dict["upload_max_concurrency"] == 2 assert as_dict["max_single_put_size"] == 10 assert as_dict["max_single_get_size"] == 20 assert as_dict["max_block_size"] == 3 def test_cache_monitor_thread(tmp_path): cache_dir = tmp_path path = cache_dir / "a_file_0" path.write_text("this is an example file") cache_target = CacheTarget(cache_dir, 1, 0.000000001) monitor = InProcessCacheMonitor(cache_target, 30, 0) path_cleaned = False for _ in range(100): time.sleep(0.1) path_cleaned = not path.exists() if path_cleaned: break monitor.shutdown() assert path_cleaned # just verify things cleaned up okay also assert not monitor.cache_monitor_thread.is_alive() assert monitor.stop_cache_monitor_event.is_set() def test_check_cache_sanity(tmp_path): # sanity check the caching code - create a 1 gig cache with a single file. # when the cache is allowed to be 20% full the file will exist but when the # cache is only allowed to be a very small fraction full headed toward zero # the file will be deleted cache_dir = tmp_path path = cache_dir / "a_file_0" path.write_text("this is an example file") big_cache_target = CacheTarget(cache_dir, 1, 0.2) check_cache(big_cache_target) assert path.exists() small_cache_target = CacheTarget(cache_dir, 1, 0.000000001) check_cache(small_cache_target) assert not path.exists() def test_fits_in_cache_check(tmp_path): cache_dir = tmp_path big_cache_target = CacheTarget(cache_dir, 1, 0.2) assert not big_cache_target.fits_in_cache(int(1024 * 1024 * 1024 * 0.3)) assert big_cache_target.fits_in_cache(int(1024 * 1024 * 1024 * 0.1)) noop_cache_target = CacheTarget(cache_dir, -1, 0.2) assert noop_cache_target.fits_in_cache(1024 * 1024 * 1024 * 100) AZURE_BLOB_NO_CACHE_TEST_CONFIG = get_example("azure_default_cache.xml") AZURE_BLOB_NO_CACHE_TEST_CONFIG_YAML = get_example("azure_default_cache.yml") @patch_object_stores_to_skip_initialize def test_config_parse_azure_no_cache(): for config_str in [AZURE_BLOB_NO_CACHE_TEST_CONFIG, AZURE_BLOB_NO_CACHE_TEST_CONFIG_YAML]: with TestConfig(config_str) as (directory, object_store): assert object_store.cache_size == -1 assert object_store.staging_path == directory.global_config.object_store_cache_path def verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=True): # Test no dataset with id 1 exists. absent_dataset = MockDataset(1) assert not object_store.exists(absent_dataset) # Write empty dataset 2 in second backend, ensure it is empty and # exists. empty_dataset = MockDataset(2) object_store.create(empty_dataset) assert object_store.exists(empty_dataset) assert object_store.empty(empty_dataset) # Write non-empty dataset in backend 1, test it is not emtpy & exists. # with cache... hello_world_dataset = MockDataset(3) hello_path = tmp_path / "hello.txt" hello_path.write_text("Hello World!") object_store.update_from_file(hello_world_dataset, file_name=hello_path, create=True) assert object_store.exists(hello_world_dataset) assert not object_store.empty(hello_world_dataset) # Test get_data data = object_store.get_data(hello_world_dataset) assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) assert data == "ello W" path = object_store.get_filename(hello_world_dataset) assert open(path).read() == "Hello World!" # Write non-empty dataset in backend 1, test it is not emtpy & exists. # without cache... hello_world_dataset_2 = MockDataset(10) object_store.update_from_file(hello_world_dataset_2, file_name=hello_path, create=True) reset_cache(object_store.cache_target) assert object_store.exists(hello_world_dataset_2) reset_cache(object_store.cache_target) assert not object_store.empty(hello_world_dataset_2) reset_cache(object_store.cache_target) data = object_store.get_data(hello_world_dataset_2) assert data == "Hello World!" reset_cache(object_store.cache_target) data = object_store.get_data(hello_world_dataset_2, start=1, count=6) assert data == "ello W" reset_cache(object_store.cache_target) path = object_store.get_filename(hello_world_dataset_2) assert open(path).read() == "Hello World!" # Test Size # Test absent and empty datasets yield size of 0. assert object_store.size(absent_dataset) == 0 assert object_store.size(empty_dataset) == 0 # Elsewise assert object_store.size(hello_world_dataset) == 12 # Test percent used (to some degree) percent_store_used = object_store.get_store_usage_percent() assert percent_store_used >= 0.0 assert percent_store_used < 100.0 # Test delete to_delete_dataset = MockDataset(5) object_store.create(to_delete_dataset) assert object_store.exists(to_delete_dataset) assert object_store.delete(to_delete_dataset) assert not object_store.exists(to_delete_dataset) # Test delete no cache to_delete_dataset = MockDataset(5) object_store.create(to_delete_dataset) assert object_store.exists(to_delete_dataset) reset_cache(object_store.cache_target) assert object_store.delete(to_delete_dataset) reset_cache(object_store.cache_target) assert not object_store.exists(to_delete_dataset) # Test bigger file to force multi-process. big_file_dataset = MockDataset(6) size = 1024 path = tmp_path / "big_file.bytes" with path.open("wb") as f: f.write(os.urandom(size)) object_store.update_from_file(big_file_dataset, file_name=hello_path, create=True) extra_files_dataset = MockDataset(7) object_store.create(extra_files_dataset) extra = tmp_path / "extra" extra.mkdir() extra_file = extra / "new_value.txt" extra_file.write_text("My new value") persist_extra_files_for_dataset( object_store, extra, extra_files_dataset, # type: ignore[arg-type,unused-ignore] extra_files_dataset._extra_files_rel_path, ) # The following checks used to exhibit different behavior depending # on how the cache was cleaned - removing the whole directory vs # just cleaning up files the way Galaxy's internal caching works with # reset_cache. So we test both here. # hard reset shutil.rmtree(object_store.cache_target.path) os.makedirs(object_store.cache_target.path) extra_path = _extra_file_path(object_store, extra_files_dataset) assert os.path.exists(extra_path) expected_extra_file = os.path.join(extra_path, "new_value.txt") assert os.path.exists(expected_extra_file) assert open(expected_extra_file).read() == "My new value" # Redo the above test with Galaxy's reset_cache which leaves empty directories # around. reset_cache(object_store.cache_target) extra_path = _extra_file_path(object_store, extra_files_dataset) assert os.path.exists(extra_path) expected_extra_file = os.path.join(extra_path, "new_value.txt") assert os.path.exists(expected_extra_file) assert open(expected_extra_file).read() == "My new value" # Test get_object_url returns a read-only URL url = object_store.get_object_url(hello_world_dataset) if check_get_url: response = get(url) response.raise_for_status() assert response.text == "Hello World!" def _extra_file_path(object_store, dataset): # invoke the magic calls the model layer would invoke here... if object_store.exists(dataset, dir_only=True, extra_dir=dataset._extra_files_rel_path): return object_store.get_filename(dataset, dir_only=True, extra_dir=dataset._extra_files_rel_path) return object_store.construct_path(dataset, dir_only=True, extra_dir=dataset._extra_files_rel_path, in_cache=True) def verify_object_store_functionality(tmp_path, object_store, check_get_url=True): # Test no dataset with id 1 exists. absent_dataset = MockDataset(1) assert not object_store.exists(absent_dataset) # Write empty dataset 2 in second backend, ensure it is empty and # exists. empty_dataset = MockDataset(2) object_store.create(empty_dataset) assert object_store.exists(empty_dataset) assert object_store.empty(empty_dataset) # Write non-empty dataset in backend 1, test it is not emtpy & exists. # with cache... hello_world_dataset = MockDataset(3) hello_path = tmp_path / "hello.txt" hello_path.write_text("Hello World!") object_store.update_from_file(hello_world_dataset, file_name=hello_path, create=True) assert object_store.exists(hello_world_dataset) assert not object_store.empty(hello_world_dataset) # Test get_data data = object_store.get_data(hello_world_dataset) assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) assert data == "ello W" path = object_store.get_filename(hello_world_dataset) assert open(path).read() == "Hello World!" # Test Size # Test absent and empty datasets yield size of 0. assert object_store.size(absent_dataset) == 0 assert object_store.size(empty_dataset) == 0 # Elsewise assert object_store.size(hello_world_dataset) == 12 # Test delete to_delete_dataset = MockDataset(5) object_store.create(to_delete_dataset) assert object_store.exists(to_delete_dataset) assert object_store.delete(to_delete_dataset) assert not object_store.exists(to_delete_dataset) # Test get_object_url returns a read-only URL url = object_store.get_object_url(hello_world_dataset) if check_get_url: response = get(url) response.raise_for_status() assert response.text == "Hello World!" def integration_test_config(example_filename: str): return TestConfig(get_example(example_filename), inject_galaxy_test_env=True) @skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") def test_real_azure_blob_store(tmp_path): with integration_test_config("azure_integration_test.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_URL") def test_real_azure_blob_store_with_account_url(tmp_path): with integration_test_config("azure_integration_test_with_account_url.yml") as ( _, object_store, ): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") @skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") def test_real_azure_blob_store_in_hierarchical(tmp_path): with integration_test_config("azure_integration_test_distributed.yml") as (_, object_store): verify_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_AWS_BUCKET") @skip_unless_environ("GALAXY_TEST_AWS_REGION") def test_real_aws_s3_store(tmp_path): with integration_test_config("aws_s3_integration_test.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_AWS_BUCKET") def test_real_aws_s3_store_boto3(tmp_path): with integration_test_config("boto3_integration_test_aws.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_AWS_BUCKET") def test_real_aws_s3_store_boto3_multipart(tmp_path): with integration_test_config("boto3_integration_test_multithreaded.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") def test_real_aws_s3_store_boto3_new_bucket(tmp_path): with integration_test_config("boto3_integration_test_aws_new_bucket.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) # this test fails if you have axel installed because axel requires URLs to work and that requires # setting a region with the cloudbridge store. @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_AWS_BUCKET") def test_aws_via_cloudbridge_store(tmp_path): with integration_test_config("cloud_integration_test_aws.yml") as (_, object_store): # disabling get_object_url check - cloudbridge in this config assumes the region # is us-east-1 and generates a URL for that region. This functionality works and can # be tested if a region is specified in the configuration (see next config and test case). verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=False) @skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_AWS_BUCKET") @skip_unless_environ("GALAXY_TEST_AWS_REGION") def test_aws_via_cloudbridge_store_with_region(tmp_path): with integration_test_config("cloud_integration_test_aws_with_region.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") def test_gcp_via_s3_interop(tmp_path): with integration_test_config("gcp_s3_integration_test.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") def test_gcp_via_s3_interop_and_boto3(tmp_path): with integration_test_config("gcp_boto3_integration_test.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) # Ensure's boto3 will use legacy connection parameters that the generic_s3 object store # would consume. @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") def test_gcp_via_s3_interop_and_boto3_with_legacy_params(tmp_path): with integration_test_config("gcp_boto3_integration_test_legacy_params.yml") as (_, object_store): verify_caching_object_store_functionality(tmp_path, object_store) class MockDataset: def __init__(self, id): self.id = id self.object_store_id = None self.uuid = uuid4() self.tags = [] def rel_path_for_uuid_test(self): rel_path = os.path.join(*directory_hash_id(self.uuid)) return rel_path @property def _extra_files_rel_path(self): return f"dataset_{self.uuid}_files" def _assert_has_keys(the_dict, keys): for key in keys: assert key in the_dict, f"key [{key}] not in [{the_dict}]" def _assert_key_has_value(the_dict, key, value): assert key in the_dict, f"dict [{key}] doesn't container expected key [{the_dict}]" assert the_dict[key] == value, f"{the_dict[key]} != {value}"