"""Test Tool execution and state handling logic."""
from collections import OrderedDict
from typing import cast
import webob.exc
from sqlalchemy import select
import galaxy.model
from galaxy.app_unittest_utils import tools_support
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.util.bunch import Bunch
from galaxy.util.unittest import TestCase
BASE_REPEAT_TOOL_CONTENTS = """
echo "$param1" #for $r in $repeat# "$r.param2" #end for# < $out1
%s
"""
# Tool with a repeat parameter, to test state update.
REPEAT_TOOL_CONTENTS = BASE_REPEAT_TOOL_CONTENTS % ("""""",)
REPEAT_COLLECTION_PARAM_CONTENTS = BASE_REPEAT_TOOL_CONTENTS % (
"""""",
)
class TestToolExecution(TestCase, tools_support.UsesTools):
tool_action: "MockAction"
def setUp(self):
self.setup_app()
self.history = galaxy.model.History()
self.app.model.session.add(self.history)
self.trans = MockTrans(self.app, self.history)
self.app.dataset_collection_manager = cast(DatasetCollectionManager, MockCollectionService())
self.tool_action = MockAction(self.trans)
def tearDown(self):
self.tear_down_app()
def test_state_new(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
vars = self.__handle_with_incoming(param1="moo")
state = self.__assert_rerenders_tool_without_errors(vars)
assert state["param1"] == "moo"
def test_execute(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
vars = self.__handle_with_incoming(param1="moo")
self.__assert_executed(vars)
# Didn't specify a rerun_remap_id so this should be None
assert self.tool_action.execution_call_args[0]["rerun_remap_job_id"] is None
def test_execute_exception(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
self.tool_action.raise_exception()
try:
self.__handle_with_incoming(param1="moo")
except Exception as e:
assert "Error executing tool" in str(e)
def test_execute_errors(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
self.tool_action.return_error()
try:
self.__handle_with_incoming(param1="moo")
except Exception as e:
assert "Test Error Message" in str(e)
def test_redirect(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
self.tool_action.expect_redirect = True
redirect_raised = False
try:
self.__handle_with_incoming(param1="moo")
except webob.exc.HTTPFound:
redirect_raised = True
assert redirect_raised
def test_remap_job(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
vars = self.__handle_with_incoming(param1="moo", rerun_remap_job_id=self.app.security.encode_id(123))
self.__assert_executed(vars)
assert self.tool_action.execution_call_args[0]["rerun_remap_job_id"] == 123
def test_invalid_remap_job(self):
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
try:
self.__handle_with_incoming(param1="moo", rerun_remap_job_id="123")
except Exception as e:
assert "invalid job" in str(e)
def test_data_param_execute(self):
self._init_tool(tools_support.SIMPLE_CAT_TOOL_CONTENTS)
hda = self.__add_dataset(1)
# Execute tool action
vars = self.__handle_with_incoming(param1=1)
self.__assert_executed(vars)
# Tool 'executed' once, with hda as param1
assert len(self.tool_action.execution_call_args) == 1
assert self.tool_action.execution_call_args[0]["incoming"]["param1"] == hda
def test_data_param_state_update(self):
self._init_tool(tools_support.SIMPLE_CAT_TOOL_CONTENTS)
hda = self.__add_dataset(1)
# Update state
vars = self.__handle_with_incoming(param1=1)
state = self.__assert_rerenders_tool_without_errors(vars)
assert hda == state["param1"]
def __handle_with_incoming(self, **kwds):
"""Execute tool.handle_input with incoming specified by kwds
(optionally extending a previous state).
"""
return self.tool.handle_input(trans=self.trans, incoming=kwds)
def __add_dataset(self, id, state="ok"):
hda = galaxy.model.HistoryDatasetAssociation()
hda.id = id
hda.dataset = galaxy.model.Dataset()
hda.dataset.state = "ok"
session = self.trans.sa_session
session.add(hda)
add_object_to_object_session(self.history, hda)
self.history.datasets.append(hda)
session.commit()
return hda
def __add_collection_dataset(self, id, collection_type="paired", *hdas):
hdca = galaxy.model.HistoryDatasetCollectionAssociation()
hdca.id = id
collection = galaxy.model.DatasetCollection()
hdca.collection = collection
galaxy.model.DatasetCollectionElement(collection=collection, element=self.__add_dataset(1))
collection.collection_type = collection_type
self.trans.sa_session.model_objects[galaxy.model.HistoryDatasetCollectionAssociation][id] = hdca
self.history.dataset_collections.append(hdca)
return hdca
def __assert_rerenders_tool_without_errors(self, vars):
self.__assert_no_errors(vars)
return self.tool_action.execution_call_args[0]["incoming"]
def __assert_executed(self, vars):
self.__assert_no_errors(vars)
assert len(vars["jobs"]) > 0
def __assert_no_errors(self, vars):
assert "job_errors" in vars
assert not vars["job_errors"]
class MockAction(tools_support.MockActionI):
def __init__(self, expected_trans):
self.expected_trans = expected_trans
self.execution_call_args = []
self.expect_redirect = False
self.exception_after_exection = None
self.error_message_after_excution = None
def execute(self, tool, trans, **kwds):
assert self.expected_trans == trans
self.execution_call_args.append(kwds)
num_calls = len(self.execution_call_args)
if self.expect_redirect:
raise webob.exc.HTTPFound(location="http://google.com")
if self.exception_after_exection is not None:
if num_calls > self.exception_after_exection:
raise Exception("Test Exception")
if self.error_message_after_excution is not None:
if num_calls > self.error_message_after_excution:
return None, "Test Error Message"
return galaxy.model.Job(), OrderedDict(out1="1")
def raise_exception(self, after_execution=0):
self.exception_after_exection = after_execution
def return_error(self, after_execution=0):
self.error_message_after_excution = after_execution
class MockTrans:
def __init__(self, app, history):
self.app = app
self.history = history
self.user = None
self.history._active_datasets_and_roles = [
hda
for hda in self.app.model.session.scalars(select(galaxy.model.HistoryDatasetAssociation)).all()
if hda.active and hda.history == history
]
self.workflow_building_mode = False
self.webapp = Bunch(name="galaxy")
self.sa_session = self.app.model.context
self.url_builder = None
self.galaxy_session = None
def get_history(self, **kwargs):
return self.history
def get_current_user_roles(self):
return []
def log_event(self, *args, **kwds):
pass
class MockCollectionService:
def __init__(self):
self.collection_info = object()
def match_collections(self, collections_to_match):
return self.collection_info