import string from typing import ( cast, Optional, ) from galaxy import model from galaxy.app_unittest_utils import tools_support from galaxy.exceptions import UserActivationRequiredException from galaxy.objectstore import BaseObjectStore from galaxy.tool_util.parser.output_objects import ToolOutput from galaxy.tool_util.parser.xml import parse_change_format from galaxy.tools.actions import ( DefaultToolAction, determine_output_format, ) from galaxy.tools.execution_helpers import ( on_text_for_dataset_and_collections, on_text_for_numeric_ids, ) from galaxy.util import XML from galaxy.util.unittest import TestCase # I cannot think of a saner way to test if data is being wrapped than use a # data param in the output label - though you would probably never want to do # this. DATA_IN_LABEL_TOOL_CONTENTS = """ echo "$param1" < $out1 """ # Tool with two outputs - used to verify all datasets within same job get same # object store id. TWO_OUTPUTS = """ echo "$param1" < $out1 """ def test_on_text_for_numeric_ids(): def assert_on_text_is(expected, hids): on_text = on_text_for_numeric_ids(hids, "dataset") assert on_text == expected, f"Wrong on text value {on_text}, expected {expected}" assert_on_text_is("dataset 1", [1]) assert_on_text_is("dataset 1 and 2", [1, 2]) assert_on_text_is("dataset 1-3", [1, 2, 3]) assert_on_text_is("dataset 1-4", [1, 2, 3, 4]) assert_on_text_is("dataset 1, 3, and 4", [1, 3, 4]) assert_on_text_is("dataset 1-3 and 5-7", [1, 2, 3, 5, 6, 7]) assert_on_text_is("dataset 1-3, 5-7, and 9-11", [1, 2, 3, 5, 6, 7, 9, 10, 11]) assert_on_text_is("dataset 1-3, 5, and 9", [1, 2, 3, 5, 9]) assert_on_text_is("dataset 1, 2, and others", [1, 2, 4, 5]) assert_on_text_is("dataset 1, 2, and 4-6", [1, 2, 4, 5, 6]) assert_on_text_is("dataset 1 and 2", [1, 1, 2]) def test_on_text_for_dataset_and_collections(): def assert_on_text_is(expected, dataset_hids, collection_hids, element_ids): on_text = on_text_for_dataset_and_collections(dataset_hids, collection_hids, element_ids) assert on_text == expected, f"Wrong on text value {on_text}, expected {expected}" assert_on_text_is("dataset 1 and collection 4 and 5", [1], [4, 5], None) assert_on_text_is("dataset 1 and SampleA and SampleB", [1], None, ["SampleA", "SampleB"]) class TestDefaultToolAction(TestCase, tools_support.UsesTools): def setUp(self): self.setup_app() history = model.History() self.history = history self.trans = MockTrans(self.app, self.history) self.app.model.context.add(history) session = self.app.model.context session.commit() self.action = DefaultToolAction() self.app.config.len_file_path = "moocow" self.app.object_store = cast(BaseObjectStore, MockObjectStore()) def test_output_created(self): _, output = self._simple_execute() assert len(output) == 1 assert "out1" in output def test_output_label(self): _, output = self._simple_execute() assert output["out1"].name == "Output (moo)" def test_output_label_data(self): hda1 = self.__add_dataset() hda2 = self.__add_dataset() incoming = { "param1": hda1, "repeat1": [ {"param2": hda2}, ], } job, output = self._simple_execute( tools_support.SIMPLE_CAT_TOOL_CONTENTS, incoming, ) assert output["out1"].name == "Test Tool on dataset 1 and 2" def test_object_store_ids(self): _, output = self._simple_execute(contents=TWO_OUTPUTS) assert output["out1"].name == "Output (moo)" assert output["out2"].name == "Output 2 (moo)" def test_params_wrapped(self): hda1 = self.__add_dataset() _, output = self._simple_execute( contents=DATA_IN_LABEL_TOOL_CONTENTS, incoming=dict(repeat1=[dict(param1=hda1)]), ) # Again this is a stupid way to ensure data parameters are wrapped. assert output["out1"].name == f"Output ({hda1.dataset.get_file_name()})" def test_inactive_user_job_create_failure(self): self.trans.user_is_active = False try: self._simple_execute() except UserActivationRequiredException: return raise AssertionError("Tool execution succeeded for inactive user!") def __add_dataset(self, state="ok"): hda = model.HistoryDatasetAssociation() hda.dataset = model.Dataset() hda.dataset.state = "ok" hda.dataset.external_filename = "/tmp/datasets/dataset_001.dat" self.history.add_dataset(hda) session = self.app.model.context session.commit() return hda def _simple_execute(self, contents=None, incoming=None): if contents is None: contents = tools_support.SIMPLE_TOOL_CONTENTS if incoming is None: incoming = dict(param1="moo") self._init_tool(contents) job, out_data, *_ = self.action.execute( tool=self.tool, trans=self.trans, history=self.history, incoming=incoming, ) return job, out_data def test_determine_output_format(): # Test simple case of explicitly defined output with no changes. direct_output = quick_output("txt") __assert_output_format_is("txt", direct_output) # Test if format is "input" (which just uses the last input on the form.) input_based_output = quick_output("input") __assert_output_format_is("fastq", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) # Test using format_source (testing a couple different positions) input_based_output = quick_output("txt", format_source="i1") __assert_output_format_is("fasta", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) input_based_output = quick_output("fastq", format_source="hdcai[0]") __assert_output_format_is("txt", input_based_output, [("i1", "fasta"), ("i2", "fastq")], add_collection=True) input_based_output = quick_output("fastq", format_source="""hdcai["forward"]""") __assert_output_format_is("txt", input_based_output, [("i1", "fasta"), ("i2", "fastq")], add_collection=True) input_based_output = quick_output("fastq", format_source="""hdcai['forward']""") __assert_output_format_is("txt", input_based_output, [("i1", "fasta"), ("i2", "fastq")], add_collection=True) input_based_output = quick_output("txt", format_source="i2") __assert_output_format_is("fastq", input_based_output, [("i1", "fasta"), ("i2", "fastq")]) change_format_xml = """ """ change_format_output = quick_output("fastq", change_format_xml=change_format_xml) # Test maching a change_format when. __assert_output_format_is( "fastqillumina", change_format_output, param_context={"options_type": {"output_type": "illumina"}} ) # Test change_format but no match __assert_output_format_is("fastq", change_format_output, param_context={"options_type": {"output_type": "sanger"}}) change_on_metadata_xml_template = string.Template(""" """) change_on_metadata_illumina = change_on_metadata_xml_template.safe_substitute({"input": "i2"}) change_on_metadata_output = quick_output("fastq", change_format_xml=change_on_metadata_illumina) __assert_output_format_is("fastqillumina", change_on_metadata_output, [("i1", "txt"), ("i2", "txt")]) change_on_metadata_solexa = change_on_metadata_xml_template.safe_substitute({"input": "i1"}) change_on_metadata_output = quick_output("fastq", change_format_xml=change_on_metadata_solexa) __assert_output_format_is("fastqsolexa", change_on_metadata_output, [("i1", "txt"), ("i2", "txt")]) def __assert_output_format_is(expected, output, input_extensions=None, param_context=None, add_collection=False): input_extensions = input_extensions or {} param_context = param_context or {} inputs = {} last_ext = "data" i = 1 for name, ext in input_extensions: hda = model.HistoryDatasetAssociation(extension=ext) hda.metadata.random_field = str(i) # Populate a random metadata field for testing inputs[name] = hda last_ext = ext i += 1 input_collections = {} if add_collection: hda_forward = model.HistoryDatasetAssociation(extension="txt") hda_reverse = model.HistoryDatasetAssociation(extension="txt") c1 = model.DatasetCollection(collection_type="pair") hc1 = model.HistoryDatasetCollectionAssociation(collection=c1, name="HistoryCollectionTest1") dce1 = model.DatasetCollectionElement( collection=c1, element=hda_forward, element_identifier="forward", element_index=0 ) dce2 = model.DatasetCollectionElement( collection=c1, element=hda_reverse, element_identifier="reverse", element_index=1 ) c1.elements = [dce1, dce2] input_collections["hdcai"] = hc1 actual_format = determine_output_format(output, param_context, inputs, input_collections, last_ext) assert actual_format == expected, f"Actual format {actual_format}, does not match expected {expected}" def quick_output( format: str, format_source: Optional[str] = None, change_format_xml: Optional[str] = None ) -> ToolOutput: test_output = ToolOutput("test_output") test_output.format = format test_output.format_source = format_source if change_format_xml: test_output.change_format = parse_change_format(XML(change_format_xml).findall("change_format")) else: test_output.change_format = [] return test_output class MockTrans: def __init__(self, app, history, user=None): self.app = app self.history = history self.user = user self.sa_session = self.app.model.context self.model = app.model self._user_is_active = True self.user_is_admin = False @property def tag_handler(self): return self.app.tag_handler def get_user_is_active(self): # NOTE: the real user_is_active also checks whether activation is enabled in the config return self._user_is_active def set_user_is_active(self, active): self._user_is_active = active user_is_active = property(get_user_is_active, set_user_is_active) def check_user_activation(self): if not self.user_is_active: raise UserActivationRequiredException() def db_dataset_for(self, input_db_key): return None def get_galaxy_session(self): return model.GalaxySession() def get_current_user_roles(self): return [] def log_event(self, *args, **kwargs): pass class MockObjectStore: def __init__(self): self.created_datasets = [] self.first_create = True self.object_store_id = "mycoolid" def exists(self, *args, **kwargs): return True def create(self, dataset): self.created_datasets.append(dataset) if self.first_create: self.first_create = False assert dataset.object_store_id is None dataset.object_store_id = self.object_store_id else: assert dataset.object_store_id == self.object_store_id