import os
import tempfile
from typing import cast
from unittest.mock import Mock
import pytest
from galaxy.datatypes.metadata import MetadataSpecCollection
from galaxy.job_execution.compute_environment import ComputeEnvironment
from galaxy.job_execution.datasets import DatasetPath
from galaxy.model import DatasetInstance
from galaxy.tools.parameters.basic import (
BooleanToolParameter,
DrillDownSelectToolParameter,
FloatToolParameter,
IntegerToolParameter,
SelectToolParameter,
TextToolParameter,
ToolParameter,
)
from galaxy.tools.wrappers import (
DatasetFilenameWrapper,
InputValueWrapper,
RawObjectWrapper,
SelectToolParameterWrapper,
)
from galaxy.util import XML
def with_mock_tool(func):
def call():
test_directory = tempfile.mkdtemp()
app = MockApp(test_directory)
tool = MockTool(app)
return func(tool)
call.__name__ = func.__name__
return call
def selectwrapper(tool, value, multiple=False, optional=False):
optional = 'optional="true"' if optional else ""
multiple = 'multiple="true"' if multiple else ""
xml = XML(f"""
""")
parameter = SelectToolParameter(tool, xml)
return SelectToolParameterWrapper(parameter, value)
@with_mock_tool
def test_select_wrapper_simple_options(tool):
wrapper = selectwrapper(tool, "x")
assert wrapper.name == "blah"
assert str(wrapper) == "x"
assert wrapper.value_label == "I am X"
wrapper = selectwrapper(tool, None, optional=True)
assert str(wrapper) == "None"
assert wrapper == ""
assert wrapper == "None"
@with_mock_tool
def test_select_wrapper_multiple_options(tool):
wrapper = selectwrapper(tool, ["x"], multiple=True)
assert wrapper.name == "blah"
assert str(wrapper) == "x"
assert "x" in wrapper
wrapper = selectwrapper(tool, ["x", "z"], multiple=True)
assert str(wrapper) == "x,z"
assert "x" in wrapper
wrapper = selectwrapper(tool, [], multiple=True)
assert str(wrapper) == "None"
assert wrapper == ""
assert wrapper == "None"
assert "x" not in wrapper
@with_mock_tool
def test_select_wrapper_with_drilldown(tool):
parameter = _drilldown_parameter(tool)
wrapper = SelectToolParameterWrapper(parameter, ["option3"])
assert str(wrapper) == "option3", str(wrapper)
@with_mock_tool
def test_select_wrapper_option_file(tool):
parameter = _setup_blast_tool(tool)
wrapper = SelectToolParameterWrapper(parameter, "val2")
assert str(wrapper) == "val2"
assert wrapper.fields.name == "name2"
assert wrapper.fields.path == "path2"
@with_mock_tool
def test_select_wrapper_multiple(tool):
parameter = _setup_blast_tool(tool, multiple=True)
wrapper = SelectToolParameterWrapper(parameter, ["val1", "val2"])
assert str(wrapper) == "val1,val2"
assert wrapper.fields.name == "name1,name2"
@with_mock_tool
def test_select_wrapper_with_path_rewritting(tool):
parameter = _setup_blast_tool(tool, multiple=True)
compute_environment = cast(ComputeEnvironment, MockComputeEnvironment(None))
wrapper = SelectToolParameterWrapper(
parameter, ["val1", "val2"], other_values={}, compute_environment=compute_environment
)
assert wrapper.fields.path == "Rewrite,Rewrite"
def test_raw_object_wrapper():
obj = Mock(x=4)
wrapper = RawObjectWrapper(obj)
assert wrapper.x == 4
assert wrapper
false_wrapper = RawObjectWrapper(False)
assert not false_wrapper
def valuewrapper(tool, value, paramtype, optional=False):
parameter: ToolParameter
if paramtype == "integer":
optional = 'optional="true"' if optional else 'value="10"'
parameter = IntegerToolParameter(tool, XML(f''))
elif paramtype == "text":
optional = 'optional="true"' if optional else 'value="foo"'
parameter = TextToolParameter(tool, XML(f''))
elif paramtype == "float":
optional = 'optional="true"' if optional else 'value="10.0"'
parameter = FloatToolParameter(tool, XML(f''))
elif paramtype == "boolean":
optional = 'optional="true"' if optional else 'value=""'
parameter = BooleanToolParameter(
tool, XML(f'')
)
return InputValueWrapper(parameter, value)
@with_mock_tool
def test_input_value_wrapper_comparison(tool):
wrapper = valuewrapper(tool, 5, "integer")
assert str(wrapper) == "5"
assert int(wrapper) == 5
assert wrapper != "5"
assert wrapper == 5
assert wrapper == 5.0
assert wrapper > 2
assert wrapper < 10
assert wrapper < 5.1
wrapper = valuewrapper(tool, True, "boolean")
assert bool(wrapper) is True, wrapper
assert str(wrapper) == "truevalue"
assert wrapper == "truevalue"
assert wrapper == "true"
wrapper = valuewrapper(tool, False, "boolean")
assert bool(wrapper) is False, wrapper
assert str(wrapper) == "falsevalue"
assert wrapper == "falsevalue"
assert wrapper == "false"
@with_mock_tool
def test_input_value_wrapper_comparison_optional(tool):
wrapper = valuewrapper(tool, None, "integer", optional=True)
assert not wrapper
with pytest.raises(ValueError):
int(wrapper)
assert str(wrapper) == ""
assert wrapper == "" # for backward-compatibility
wrapper = valuewrapper(tool, 0, "integer", optional=True)
assert wrapper == 0
assert int(wrapper) == 0
assert str(wrapper)
assert (
wrapper != ""
) # for backward-compatibility, the correct way to check if an optional integer param is not empty is to use str(wrapper)
wrapper = valuewrapper(tool, None, "integer", optional=True)
assert wrapper != 1
assert str(wrapper) == ""
assert wrapper == None # noqa: E711
wrapper = valuewrapper(tool, None, "boolean", optional=True)
assert bool(wrapper) is False, wrapper
assert str(wrapper) == "falsevalue"
@with_mock_tool
def test_input_value_wrapper_input_value_wrapper_comparison(tool):
wrapper = valuewrapper(tool, 5, "integer")
assert str(wrapper) == valuewrapper(tool, "5", "text")
assert int(wrapper) == valuewrapper(tool, "5", "integer")
assert wrapper != valuewrapper(tool, "5", "text")
assert wrapper == valuewrapper(tool, "5", "integer")
assert wrapper == valuewrapper(tool, "5", "float")
assert wrapper > valuewrapper(tool, "2", "integer")
assert wrapper < valuewrapper(tool, "10", "integer")
assert wrapper < valuewrapper(tool, "5.1", "float")
def test_dataset_wrapper():
dataset = cast(DatasetInstance, MockDataset())
wrapper = DatasetFilenameWrapper(dataset)
assert str(wrapper) == MOCK_DATASET_PATH
assert wrapper.file_name == MOCK_DATASET_PATH
assert wrapper.ext == MOCK_DATASET_EXT
def test_dataset_wrapper_false_path():
dataset = cast(DatasetInstance, MockDataset())
new_path = "/new/path/dataset_123.dat"
wrapper = DatasetFilenameWrapper(
dataset, compute_environment=cast(ComputeEnvironment, MockComputeEnvironment(false_path=new_path))
)
assert str(wrapper) == new_path
assert wrapper.file_name == new_path
class MockComputeEnvironment:
def __init__(self, false_path, false_extra_files_path=None):
self.false_path = false_path
self.false_extra_files_path = false_extra_files_path
def input_path_rewrite(self, dataset):
return self.false_path
def input_extra_files_rewrite(self, dataset):
return self.false_extra_files_path
def unstructured_path_rewrite(self, path):
return f"Rewrite<{path}>"
def test_dataset_false_extra_files_path():
dataset = cast(DatasetInstance, MockDataset())
wrapper = DatasetFilenameWrapper(dataset)
assert wrapper.extra_files_path == MOCK_DATASET_EXTRA_FILES_PATH
new_path = "/new/path/dataset_123.dat"
dataset_path = DatasetPath(123, MOCK_DATASET_PATH, false_path=new_path)
wrapper = DatasetFilenameWrapper(
dataset,
compute_environment=cast(
ComputeEnvironment, MockComputeEnvironment(dataset_path, MOCK_DATASET_EXTRA_FILES_PATH)
),
)
# Setting false_path is not enough to override
assert wrapper.extra_files_path == MOCK_DATASET_EXTRA_FILES_PATH
new_files_path = "/new/path/dataset_123_files"
wrapper = DatasetFilenameWrapper(
dataset,
compute_environment=cast(
ComputeEnvironment, MockComputeEnvironment(false_path=new_path, false_extra_files_path=new_files_path)
),
)
assert wrapper.extra_files_path == new_files_path
def _drilldown_parameter(tool):
xml = XML("""
""")
parameter = DrillDownSelectToolParameter(tool, xml)
return parameter
def _setup_blast_tool(tool, multiple=False):
tool.app.write_test_tool_data("blastdb.loc", "val1\tname1\tpath1\nval2\tname2\tpath2\n")
xml = XML(f"""
""")
parameter = SelectToolParameter(tool, xml)
return parameter
MOCK_DATASET_PATH = "/galaxy/database/files/001/dataset_123.dat"
MOCK_DATASET_EXTRA_FILES_PATH = "/galaxy/database/files/001/dataset_123.dat"
MOCK_DATASET_EXT = "bam"
class MockDataset:
def __init__(self):
self.metadata = MetadataSpecCollection({})
self.extra_files_path = MOCK_DATASET_EXTRA_FILES_PATH
self.ext = MOCK_DATASET_EXT
self.tags = []
self.has_deferred_data = False
def get_file_name(self, sync_cache=True):
return MOCK_DATASET_PATH
class MockTool:
def __init__(self, app):
self.app = app
self.options = Mock(sanitize=False)
self.profile = 23.0
class MockApp:
def __init__(self, test_directory):
self.config = Mock(tool_data_path=test_directory)
def write_test_tool_data(self, name, contents):
path = os.path.join(self.config.tool_data_path, name)
open(path, "w").write(contents)