"""Integration tests for maximum workflow invocation duration configuration option."""
import os
import re
import string
import tempfile
import time
from json import dumps
from galaxy import model
from galaxy_test.base.populators import (
DatasetPopulator,
WorkflowPopulator,
)
from galaxy_test.driver import integration_util
SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
WORKFLOW_HANDLER_CONFIGURATION_JOB_CONF = os.path.join(SCRIPT_DIRECTORY, "workflow_handler_configuration_job_conf.xml")
WORKFLOW_HANDLER_JOB_CONFIG_TEMPLATE = string.Template("""
""")
POOL_JOB_CONFIG_TEMPLATE = string.Template("""
""")
WORKFLOW_SCHEDULERS_CONFIG_TEMPLATE = string.Template("""
""")
JOB_HANDLER_PATTERN = re.compile(r"handler\d")
WORKFLOW_SCHEDULER_HANDLER_PATTERN = re.compile(r"work\d")
PAUSE_WORKFLOW = """
class: GalaxyWorkflow
steps:
- label: test_input
type: input
- label: the_pause
type: pause
connect:
input:
- test_input
"""
def config_file(template, assign_with=""):
with tempfile.NamedTemporaryFile(
mode="w", suffix=".xml", prefix="workflow_handler_config_", delete=False
) as config:
if assign_with:
assign_with = f'assign_with="{assign_with}"'
config.write(template.substitute(assign_with=assign_with))
return config.name
class BaseWorkflowHandlerConfigurationTestCase(integration_util.IntegrationTestCase):
dataset_populator: DatasetPopulator
framework_tool_and_types = True
assign_with = ""
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
@classmethod
def handle_galaxy_config_kwds(cls, config):
config["job_config_file"] = config_file(WORKFLOW_HANDLER_JOB_CONFIG_TEMPLATE, assign_with=cls.assign_with)
def _invoke_n_workflows(self, n, history_id: str):
workflow_id = self.workflow_populator.upload_yaml_workflow(PAUSE_WORKFLOW)
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
index_map = {"0": dict(src="hda", id=hda1["id"])}
request = {}
request["history"] = f"hist_id={history_id}"
request["inputs"] = dumps(index_map)
request["inputs_by"] = "step_index"
url = f"workflows/{workflow_id}/invocations"
for _ in range(n):
self._post(url, data=request, json=True)
def _get_workflow_invocations(self, history_id: str):
# Consider exposing handler via the API to reduce breaking
# into Galaxy's internal state.
app = self._app
history_id = app.security.decode_id(history_id)
history = app.model.session.get(model.History, history_id)
assert history is not None
workflow_invocations = history.workflow_invocations
return workflow_invocations
@property
def is_app_workflow_scheduler(self):
return self._app.workflow_scheduling_manager.request_monitor is not None
class TestHistoryRestrictionConfiguration(BaseWorkflowHandlerConfigurationTestCase):
# Assign with db-preassign. Would also work with grabbing assignment, but we don't start grabber.
assign_with = "db-preassign"
def test_history_to_handler_restriction(self, history_id: str):
self._invoke_n_workflows(10, history_id)
workflow_invocations = self._get_workflow_invocations(history_id)
assert len(workflow_invocations) == 10
# Verify all 10 assigned to same handler - there would be a
# 1 in 10^10 chance for this to occur randomly.
for workflow_invocation in workflow_invocations:
assert workflow_invocation.handler == workflow_invocations[0].handler
assert JOB_HANDLER_PATTERN.match(workflow_invocations[0].handler)
class TestHistoryParallelConfiguration(BaseWorkflowHandlerConfigurationTestCase):
# Assign with db-preassign. Would also work with grabbing assignment, but we don't start grabber.
assign_with = "db-preassign"
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["parallelize_workflow_scheduling_within_histories"] = True
def test_workflows_spread_across_multiple_handlers(self, history_id: str):
self._invoke_n_workflows(20, history_id)
workflow_invocations = self._get_workflow_invocations(history_id)
assert len(workflow_invocations) == 20
handlers = set()
for workflow_invocation in workflow_invocations:
handlers.add(workflow_invocation.handler)
assert JOB_HANDLER_PATTERN.match(workflow_invocation.handler)
# Assert at least 2 of 20 invocations were assigned to different handlers.
assert len(handlers) >= 1, handlers
# Setup an explicit workflow handler and make sure this is assigned to that.
class TestWorkflowSchedulerHandlerAssignment(BaseWorkflowHandlerConfigurationTestCase):
# Assign with db-preassign. Would also work with grabbing assignment, but we don't start grabber.
assign_with = "db-preassign"
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["workflow_schedulers_config_file"] = config_file(
WORKFLOW_SCHEDULERS_CONFIG_TEMPLATE, assign_with=cls.assign_with
)
def test_handler_assignment(self, history_id: str):
self._invoke_n_workflows(1, history_id)
workflow_invocations = self._get_workflow_invocations(history_id)
assert WORKFLOW_SCHEDULER_HANDLER_PATTERN.match(workflow_invocations[0].handler)
# Following 8 classes test 8 different ways Galaxy processes can be workflow schedulers or not.
# - In single process mode, the process is a workflow scheduler.
# - If no workflow schedulers conf is configured and the process is a job handler, it is a workflow scheduler as well.
# - If no workflow schedulers conf is configured and the process is a job handler, it is a workflow scheduler as well (with db-dkip-locked).
# - If no workflow schedulers conf is configured and the process is not a job handler, it is not a workflow scheduler as well.
# - If a workflow scheduler conf is defined and the process is listed as a handler, it is workflow scheduler.
# - If a workflow scheduler conf is defined and assign_with is set to db-skip-locked, invocation handler is correctly set
# - If a workflow scheduler conf is defined and assign_with is set to db-transaction-isolation, invocation handler is correctly set
# - If a workflow scheduler conf is defined and the process is not listed as a handler, it is not workflow scheduler.
class TestDefaultWorkflowHandlerOn(BaseWorkflowHandlerConfigurationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
# Override so we don't setup a job conf like in the base class.
pass
def test_default_main_process_is_handler(self):
assert self.is_app_workflow_scheduler
class TestDefaultWorkflowHandlerIfJobHandlerOn(BaseWorkflowHandlerConfigurationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["server_name"] = "handler0"
def test_default_job_handler_is_workflow_handler(self):
assert self.is_app_workflow_scheduler
class TestJobHandlerAsWorkflowHandlerWithDbSkipLocked(BaseWorkflowHandlerConfigurationTestCase):
assign_with = "db-skip-locked"
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["server_name"] = "handler0"
def test_handler_assignment(self, history_id: str):
self._invoke_n_workflows(1, history_id)
time.sleep(2)
workflow_invocations = self._get_workflow_invocations(history_id)
assert JOB_HANDLER_PATTERN.match(workflow_invocations[0].handler)
def test_default_job_handler_is_workflow_handler(self):
assert self.is_app_workflow_scheduler
class TestJobHandlerAsWorkflowHandlerWithDbSkipLockedAttachToPool(TestJobHandlerAsWorkflowHandlerWithDbSkipLocked):
@classmethod
def handle_galaxy_config_kwds(cls, config):
config["job_config_file"] = config_file(POOL_JOB_CONFIG_TEMPLATE, assign_with=cls.assign_with)
config["server_name"] = "handler0"
config["attach_to_pools"] = ["job-handlers"]
class TestDefaultWorkflowHandlerIfJobHandlerOff(BaseWorkflowHandlerConfigurationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["server_name"] = "web0"
def test_default_job_handler_is_not_workflow_handler(self):
assert not self.is_app_workflow_scheduler
class TestExplicitWorkflowHandlersOn(BaseWorkflowHandlerConfigurationTestCase):
assign_with = ""
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["workflow_schedulers_config_file"] = config_file(
WORKFLOW_SCHEDULERS_CONFIG_TEMPLATE, assign_with=cls.assign_with
)
config["server_name"] = "work1"
def test_app_is_workflow_scheduler(self):
assert self.is_app_workflow_scheduler
@integration_util.skip_unless_postgres()
class TestWorkflowSchedulerHandlerAssignmentDbSkipLocked(TestExplicitWorkflowHandlersOn):
assign_with = "db-skip-locked"
def test_handler_assignment(self, history_id: str):
self._invoke_n_workflows(1, history_id)
time.sleep(2)
workflow_invocations = self._get_workflow_invocations(history_id)
assert WORKFLOW_SCHEDULER_HANDLER_PATTERN.match(workflow_invocations[0].handler)
@integration_util.skip_unless_postgres()
class TestWorkflowSchedulerHandlerAssignmentDbTransactionIsolation(TestWorkflowSchedulerHandlerAssignmentDbSkipLocked):
assign_with = "db-transaction-isolation"
class TestExplicitWorkflowHandlersOff(BaseWorkflowHandlerConfigurationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["workflow_schedulers_config_file"] = config_file(
WORKFLOW_SCHEDULERS_CONFIG_TEMPLATE, assign_with=cls.assign_with
)
config["server_name"] = "handler0" # Configured as a job handler but not a workflow handler.
def test_app_is_not_workflow_scheduler(self):
assert not self.is_app_workflow_scheduler
class TestExplicitWorkflowHandlersOffPool(BaseWorkflowHandlerConfigurationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["workflow_schedulers_config_file"] = config_file(
WORKFLOW_SCHEDULERS_CONFIG_TEMPLATE, assign_with=cls.assign_with
)
config["server_name"] = "handler0" # Configured as a job handler but not a workflow handler.
config["attach_to_pools"] = ["job-handlers"]
def test_app_is_not_workflow_scheduler(self):
assert not self.is_app_workflow_scheduler