import os
import random
import string
import subprocess
import time
from galaxy_test.base.populators import DatasetPopulator
from galaxy_test.driver import integration_util
from galaxy_test.driver.integration_util import (
docker_exec,
docker_ip_address,
docker_rm,
docker_run,
)
OBJECT_STORE_HOST = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_HOST", "127.0.0.1")
OBJECT_STORE_PORT = int(os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_PORT", 9000))
OBJECT_STORE_ACCESS_KEY = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_ACCESS_KEY", "minioadmin")
OBJECT_STORE_SECRET_KEY = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_SECRET_KEY", "minioadmin")
OBJECT_STORE_RUCIO_ACCOUNT = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_RUCIO_ACCOUNT", "root")
OBJECT_STORE_RUCIO_USERNAME = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_RUCIO_USERNAME", "rucio")
OBJECT_STORE_RUCIO_RSE_NAME = "TEST"
OBJECT_STORE_RUCIO_ACCESS = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_RUCIO_ACCESS", "rucio")
OBJECT_STORE_CONFIG = string.Template("""
""")
RUCIO_OBJECT_STORE_CONFIG = string.Template("""
type: rucio
upload_rse_name: ${rucio_rse}
upload_scheme: file
register_only: false
download_schemes:
- rse: ${rucio_rse}
scheme: file
ignore_checksum: false
scope: galaxy
host: http://${host}:${port}
account: ${rucio_account}
auth_host: http://${host}:${port}
username: ${rucio_username}
password: ${rucio_password}
auth_type: userpass
cache:
path: ${temp_directory}/object_store_cache
size: 1000
cache_updated_data: ${cache_updated_data}
""")
AZURE_OBJECT_STORE_CONFIG = string.Template("""
type: distributed
backends:
- type: azure_blob
id: azure1
name: Azure Store 1
allow_selection: true
weight: 1
auth:
account_name: ${account_name}
account_key: ${account_key}
container:
name: ${container_name}
extra_dirs:
- type: job_work
path: "${temp_directory}/database/job_working_directory_azure_1"
- type: temp
path: "${temp_directory}/database/tmp_azure_1"
- type: azure_blob
id: azure2
name: Azure Store 2
allow_selection: true
weight: 1
auth:
account_name: ${account_name}
account_key: ${account_key}
container:
name: ${container_name}
extra_dirs:
- type: job_work
path: "${temp_directory}/database/job_working_directory_azure_2"
- type: temp
path: "${temp_directory}/database/tmp_azure_2"
""")
# Onedata setup for the test is done according to this documentation:
# https://onedata.org/#/home/documentation/topic/stable/demo-mode
ONEDATA_DEMO_SPACE_NAME = "demo-space"
ONEDATA_OBJECT_STORE_CONFIG = string.Template("""
""")
def wait_rucio_ready(container_name):
timeout = 30
start_time = time.time()
rse = None
while True:
try:
rse = docker_exec(container_name, "rucio", "list-rses").decode("utf-8").strip()
if rse == OBJECT_STORE_RUCIO_RSE_NAME:
return
except subprocess.CalledProcessError:
pass
if time.time() - start_time >= timeout:
raise TimeoutError(f"cannot start Rucio {rse}")
time.sleep(1)
def start_rucio(container_name):
ports = [(OBJECT_STORE_PORT, 80)]
docker_run("savannah.ornl.gov/ndip/public-docker/rucio:1.29.8", container_name, ports=ports)
wait_rucio_ready(container_name)
class BaseObjectStoreIntegrationTestCase(integration_util.IntegrationTestCase, integration_util.ConfiguresObjectStores):
dataset_populator: DatasetPopulator
framework_tool_and_types = True
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
def get_files(directory):
for rel_directory, _, files in os.walk(directory):
for file_ in files:
yield os.path.join(rel_directory, file_)
def files_count(directory):
return sum(1 for _ in get_files(directory))
@integration_util.skip_unless_docker()
class BaseSwiftObjectStoreIntegrationTestCase(BaseObjectStoreIntegrationTestCase):
object_store_cache_path: str
@classmethod
def setUpClass(cls):
cls.container_name = f"{cls.__name__}_container"
start_minio(cls.container_name)
super().setUpClass()
@classmethod
def tearDownClass(cls):
docker_rm(cls.container_name)
super().tearDownClass()
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
temp_directory = cls._test_driver.mkdtemp()
cls.object_stores_parent = temp_directory
cls.object_store_cache_path = os.path.join(temp_directory, "object_store_cache")
config_path = os.path.join(temp_directory, "object_store_conf.xml")
config["object_store_store_by"] = "uuid"
config["metadata_strategy"] = "extended"
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False
with open(config_path, "w") as f:
f.write(
OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"host": OBJECT_STORE_HOST,
"port": OBJECT_STORE_PORT,
"access_key": OBJECT_STORE_ACCESS_KEY,
"secret_key": OBJECT_STORE_SECRET_KEY,
"cache_updated_data": cls.updateCacheData(),
}
)
)
config["object_store_config_file"] = config_path
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
@classmethod
def updateCacheData(cls):
return True
class BaseAzureObjectStoreIntegrationTestCase(
BaseObjectStoreIntegrationTestCase, integration_util.ConfiguresWorkflowScheduling
):
object_store_cache_path: str
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
# disabling workflow scheduling to limit database locking when
# testing without postgres.
cls._disable_workflow_scheduling(config)
temp_directory = cls._test_driver.mkdtemp()
cls.object_stores_parent = temp_directory
cls.object_store_cache_path = os.path.join(temp_directory, "object_store_cache")
config_path = os.path.join(temp_directory, "object_store_conf.yml")
config["object_store_store_by"] = "uuid"
config["metadata_strategy"] = "extended"
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False
with open(config_path, "w") as f:
f.write(
AZURE_OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"],
"account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"],
"container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"],
}
)
)
config["object_store_config_file"] = config_path
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
@classmethod
def updateCacheData(cls):
return True
@integration_util.skip_unless_docker()
class BaseRucioObjectStoreIntegrationTestCase(BaseObjectStoreIntegrationTestCase):
object_store_cache_path: str
@classmethod
def setUpClass(cls):
cls.container_name = f"{cls.__name__}_container"
start_rucio(cls.container_name)
super().setUpClass()
@classmethod
def tearDownClass(cls):
docker_rm(cls.container_name)
super().tearDownClass()
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
temp_directory = cls._test_driver.mkdtemp()
cls.object_stores_parent = temp_directory
cls.object_store_cache_path = os.path.join(temp_directory, "object_store_cache")
config_path = os.path.join(temp_directory, "object_store_conf.yml")
config["object_store_store_by"] = "uuid"
config["metadata_strategy"] = "extended"
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False
with open(config_path, "w") as f:
f.write(
RUCIO_OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"host": OBJECT_STORE_HOST,
"port": OBJECT_STORE_PORT,
"rucio_account": OBJECT_STORE_RUCIO_ACCOUNT,
"rucio_username": OBJECT_STORE_RUCIO_USERNAME,
"rucio_password": OBJECT_STORE_RUCIO_ACCESS,
"rucio_rse": OBJECT_STORE_RUCIO_RSE_NAME,
"cache_updated_data": cls.updateCacheData(),
}
)
)
config["object_store_config_file"] = config_path
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
@classmethod
def updateCacheData(cls):
return True
@integration_util.skip_unless_docker()
class BaseOnedataObjectStoreIntegrationTestCase(BaseObjectStoreIntegrationTestCase):
object_store_cache_path: str
@classmethod
def setUpClass(cls):
cls.oz_container_name = f"{cls.__name__}_oz_container"
cls.op_container_name = f"{cls.__name__}_op_container"
start_onezone(cls.oz_container_name)
oz_ip_address = docker_ip_address(cls.oz_container_name)
start_oneprovider(cls.op_container_name, oz_ip_address)
await_oneprovider_demo_readiness(cls.op_container_name)
super().setUpClass()
@classmethod
def tearDownClass(cls):
docker_rm(cls.op_container_name)
docker_rm(cls.oz_container_name)
super().tearDownClass()
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
temp_directory = cls._test_driver.mkdtemp()
cls.object_stores_parent = temp_directory
cls.object_store_cache_path = os.path.join(temp_directory, "object_store_cache")
config_path = os.path.join(temp_directory, "object_store_conf.xml")
config["object_store_store_by"] = "uuid"
config["metadata_strategy"] = "extended"
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False
with open(config_path, "w") as f:
f.write(
ONEDATA_OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"access_token": get_onedata_access_token(cls.oz_container_name),
"onezone_domain": docker_ip_address(cls.oz_container_name),
"space_name": ONEDATA_DEMO_SPACE_NAME,
"optional_space_params": random.choice(["", 'path=""', 'path="a/b/c/d"']),
"cache_updated_data": cls.updateCacheData(),
}
)
)
config["object_store_config_file"] = config_path
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
@classmethod
def updateCacheData(cls):
return True
def start_minio(container_name):
ports = [(OBJECT_STORE_PORT, 9000)]
docker_run("minio/minio:latest", container_name, "server", "/data", ports=ports)
def start_onezone(oz_container_name):
docker_run("onedata/onezone:21.02.5-dev", oz_container_name, "demo")
def start_oneprovider(op_container_name, oz_ip_address):
docker_run("onedata/oneprovider:21.02.5-dev", op_container_name, "demo", oz_ip_address)
def await_oneprovider_demo_readiness(op_container_name):
docker_exec(op_container_name, "await-demo", output=False)
def get_onedata_access_token(oz_container_name):
return docker_exec(oz_container_name, "demo-access-token").decode("utf-8").strip()