import json
import os
from typing import cast
from galaxy import (
model,
util,
)
from galaxy.app_unittest_utils import tools_support
from galaxy.objectstore import BaseObjectStore
from galaxy.tool_util.parser import output_collection_def
from galaxy.tool_util.provided_metadata import (
BaseToolProvidedMetadata,
LegacyToolProvidedMetadata,
NullToolProvidedMetadata,
)
from galaxy.util.unittest import TestCase
DEFAULT_TOOL_OUTPUT = "out1"
DEFAULT_EXTRA_NAME = "test1"
class TestCollectPrimaryDatasets(TestCase, tools_support.UsesTools):
def setUp(self):
self.setup_app()
object_store = cast(BaseObjectStore, MockObjectStore())
self.app.object_store = object_store
self._init_tool(tools_support.SIMPLE_TOOL_CONTENTS)
self._setup_test_output()
model.Dataset.object_store = object_store
def tearDown(self):
if model.Dataset.object_store is self.app.object_store:
model.Dataset.object_store = None
def test_empty_collect(self):
assert len(self._collect()) == 0
def test_collect_multiple(self):
path1 = self._setup_extra_file(name="test1")
path2 = self._setup_extra_file(name="test2")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
assert len(datasets[DEFAULT_TOOL_OUTPUT]) == 2
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["test1", "test2"]
created_hda_1 = datasets[DEFAULT_TOOL_OUTPUT]["test1"]
assert_created_with_path(self.app.object_store, created_hda_1.dataset, path1)
created_hda_2 = datasets[DEFAULT_TOOL_OUTPUT]["test2"]
assert_created_with_path(self.app.object_store, created_hda_2.dataset, path2)
# Test default metadata stuff
assert created_hda_1.visible
# Since discover_datasets not specified, older name based pattern
# didn't result in a dbkey being set.
assert created_hda_1.dbkey == "?"
def test_collect_multiple_recurse(self):
self._replace_output_collectors("""""")
path1 = self._setup_extra_file(filename="test1", subdir="subdir1")
path2 = self._setup_extra_file(filename="test2", subdir="subdir2/nested1/")
path3 = self._setup_extra_file(filename="test3", subdir="subdir2")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
assert len(datasets[DEFAULT_TOOL_OUTPUT]) == 3
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["test1", "test2", "test3"]
created_hda_1 = datasets[DEFAULT_TOOL_OUTPUT]["test1"]
assert_created_with_path(self.app.object_store, created_hda_1.dataset, path1)
created_hda_2 = datasets[DEFAULT_TOOL_OUTPUT]["test2"]
assert_created_with_path(self.app.object_store, created_hda_2.dataset, path2)
created_hda_3 = datasets[DEFAULT_TOOL_OUTPUT]["test3"]
assert_created_with_path(self.app.object_store, created_hda_3.dataset, path3)
def test_collect_multiple_recurse_dict(self):
self._replace_output_collectors_from_dict(
{
"discover_datasets": [
{
"pattern": "__name__",
"directory": "subdir1",
"recurse": True,
"format": "txt",
},
{
"pattern": "__name__",
"directory": "subdir2",
"recurse": True,
"format": "txt",
},
]
}
)
path1 = self._setup_extra_file(filename="test1", subdir="subdir1")
path2 = self._setup_extra_file(filename="test2", subdir="subdir2/nested1/")
path3 = self._setup_extra_file(filename="test3", subdir="subdir2")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
assert len(datasets[DEFAULT_TOOL_OUTPUT]) == 3
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["test1", "test2", "test3"]
created_hda_1 = datasets[DEFAULT_TOOL_OUTPUT]["test1"]
assert_created_with_path(self.app.object_store, created_hda_1.dataset, path1)
created_hda_2 = datasets[DEFAULT_TOOL_OUTPUT]["test2"]
assert_created_with_path(self.app.object_store, created_hda_2.dataset, path2)
created_hda_3 = datasets[DEFAULT_TOOL_OUTPUT]["test3"]
assert_created_with_path(self.app.object_store, created_hda_3.dataset, path3)
def test_collect_collection_default_format(self):
self._replace_output_collectors("""
""")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="test1")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="test2")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
for dataset in datasets[DEFAULT_TOOL_OUTPUT].values():
assert dataset.ext == "abcdef"
def test_collect_sorted_reverse(self):
self._replace_output_collectors("""""")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="test1")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="test2")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["test2", "test1"]
def test_collect_sorted_name(self):
self._replace_output_collectors("""""")
# Setup filenames in reverse order and ensure name is used as key.
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="ctest1")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="btest2")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="atest3")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["test1", "test2", "test3"]
def test_collect_sorted_numeric(self):
self._replace_output_collectors("""""")
# Setup filenames in reverse order and ensure name is used as key.
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="c1")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="b10")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="a100")
datasets = self._collect()
assert DEFAULT_TOOL_OUTPUT in datasets
# Test default order of collection.
assert list(datasets[DEFAULT_TOOL_OUTPUT].keys()) == ["1", "10", "100"]
def test_collect_hidden(self):
self._setup_extra_file(visible="hidden")
created_hda = self._collect_default_extra()
assert not created_hda.visible
def test_collect_ext(self):
self._setup_extra_file(ext="txt")
created_hda = self._collect_default_extra()
assert created_hda.ext == "txt"
def test_copied_to_imported_histories(self):
self._setup_extra_file()
cloned_hda = self.hda.copy()
history_2 = self._new_history(hdas=[cloned_hda])
assert len(history_2.datasets) == 1
self._collect()
# Make sure extra primary was copied to cloned history with
# cloned output.
assert len(history_2.datasets) == 2
def test_dbkey_from_filename(self):
self._setup_extra_file(dbkey="hg19")
created_hda = self._collect_default_extra()
assert created_hda.dbkey == "hg19"
def test_dbkey_from_galaxy_json(self):
path = self._setup_extra_file()
self._append_job_json(dict(dbkey="hg19"), output_path=path)
created_hda = self._collect_default_extra()
assert created_hda.dbkey == "hg19"
def test_name_from_galaxy_json(self):
path = self._setup_extra_file()
self._append_job_json(dict(name="test_from_json"), output_path=path)
created_hda = self._collect_default_extra()
assert "test_from_json" in created_hda.name
def test_info_from_galaxy_json(self):
path = self._setup_extra_file()
self._append_job_json(dict(info="extra output info"), output_path=path)
created_hda = self._collect_default_extra()
assert created_hda.info == "extra output info"
def test_extension_from_galaxy_json(self):
path = self._setup_extra_file()
self._append_job_json(dict(ext="txt"), output_path=path)
created_hda = self._collect_default_extra()
assert created_hda.ext == "txt"
def test_job_param(self):
self._setup_extra_file()
assert len(self.job.output_datasets) == 1
self._collect_default_extra()
assert len(self.job.output_datasets) == 2
extra_job_assoc = [job_assoc for job_assoc in self.job.output_datasets if job_assoc.name.startswith("__")][0]
assert extra_job_assoc.name == "__new_primary_file_out1|test1__"
def test_pattern_override_designation(self):
self._replace_output_collectors(
""""""
)
self._setup_extra_file(subdir="subdir", filename="foo.txt")
primary_outputs = self._collect()[DEFAULT_TOOL_OUTPUT]
assert len(primary_outputs) == 1
created_hda = next(iter(primary_outputs.values()))
assert "foo.txt" in created_hda.name
assert created_hda.ext == "txt"
assert created_hda.dbkey == "btau"
assert created_hda.dbkey == "btau"
def test_name_and_ext_pattern(self):
self._replace_output_collectors(
""""""
)
self._setup_extra_file(subdir="subdir", filename="foo1.txt")
self._setup_extra_file(subdir="subdir", filename="foo2.tabular")
primary_outputs = self._collect()[DEFAULT_TOOL_OUTPUT]
assert len(primary_outputs) == 2
assert primary_outputs["foo1"].ext == "txt"
assert primary_outputs["foo2"].ext == "tabular"
assert primary_outputs["foo1"].dbkey == "btau"
assert primary_outputs["foo2"].dbkey == "btau"
def test_custom_pattern(self):
# Hypothetical oral metagenomic classifier that populates a directory
# of files based on name and genome. Use custom regex pattern to grab
# and classify these files.
self._replace_output_collectors(
""""""
)
self._setup_extra_file(subdir="genome_breakdown", filename="samp1__hg19.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp2__lactLact.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp3__hg19.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp4__lactPlan.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp5__fusoNucl.fasta")
# Put a file in directory we don't care about, just to make sure
# it doesn't get picked up by pattern.
self._setup_extra_file(subdir="genome_breakdown", filename="overview.txt")
primary_outputs = self._collect()[DEFAULT_TOOL_OUTPUT]
assert len(primary_outputs) == 5
genomes = dict(samp1="hg19", samp2="lactLact", samp3="hg19", samp4="lactPlan", samp5="fusoNucl")
for key, hda in primary_outputs.items():
assert hda.dbkey == genomes[key]
def test_custom_pattern_dict(self):
self._replace_output_collectors_from_dict(
{
"discover_datasets": {
"pattern": "(?P.*)__(?P.*).fasta",
"directory": "genome_breakdown",
"format": "fasta",
}
}
)
self._setup_extra_file(subdir="genome_breakdown", filename="samp1__hg19.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp2__lactLact.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp3__hg19.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp4__lactPlan.fasta")
self._setup_extra_file(subdir="genome_breakdown", filename="samp5__fusoNucl.fasta")
# Put a file in directory we don't care about, just to make sure
# it doesn't get picked up by pattern.
self._setup_extra_file(subdir="genome_breakdown", filename="overview.txt")
primary_outputs = self._collect()[DEFAULT_TOOL_OUTPUT]
assert len(primary_outputs) == 5
genomes = dict(samp1="hg19", samp2="lactLact", samp3="hg19", samp4="lactPlan", samp5="fusoNucl")
for key, hda in primary_outputs.items():
assert hda.dbkey == genomes[key]
def test_name_versus_designation(self):
"""This test demonstrates the difference between name and desgination
in grouping patterns and named patterns such as __designation__,
__name__, __designation_and_ext__, and __name_and_ext__.
"""
self._replace_output_collectors("""""")
self._setup_extra_file(subdir="subdir_for_name_discovery", filename="example1.txt")
self._setup_extra_file(subdir="subdir_for_designation_discovery", filename="example2.txt")
primary_outputs = self._collect()[DEFAULT_TOOL_OUTPUT]
name_output = primary_outputs["example1"]
designation_output = primary_outputs["example2"]
# While name is also used for designation, designation is not the name -
# it is used in the calculation of the name however...
assert name_output.name == "example1"
assert designation_output.name == "{} ({})".format(self.hda.name, "example2")
def test_cannot_read_files_outside_job_directory(self):
self._replace_output_collectors("""""")
exception_thrown = False
try:
self._collect()
except Exception:
exception_thrown = True
assert exception_thrown
def _collect_default_extra(self, **kwargs):
collected = self._collect(**kwargs)
assert DEFAULT_TOOL_OUTPUT in collected, f"No such key [{DEFAULT_TOOL_OUTPUT}], in {collected}"
output_files = collected[DEFAULT_TOOL_OUTPUT]
assert DEFAULT_EXTRA_NAME in output_files, f"No such key [{DEFAULT_EXTRA_NAME}]"
return output_files[DEFAULT_EXTRA_NAME]
def _collect(self, job_working_directory=None):
if not job_working_directory:
job_working_directory = self.test_directory
meta_file = os.path.join(self.test_directory, "galaxy.json")
tool_provided_metadata: BaseToolProvidedMetadata
if not os.path.exists(meta_file):
tool_provided_metadata = NullToolProvidedMetadata()
else:
tool_provided_metadata = LegacyToolProvidedMetadata(meta_file)
return self.tool.discover_outputs(
self.outputs,
{},
tool_provided_metadata,
job_working_directory,
job=self.job,
input_ext="txt",
input_dbkey="btau",
)
def _replace_output_collectors(self, xml_str):
# Rewrite tool as if it had been created with output containing
# supplied dataset_collector elem.
elem = util.parse_xml_string(xml_str)
self.tool.outputs[DEFAULT_TOOL_OUTPUT].dataset_collector_descriptions = (
output_collection_def.dataset_collector_descriptions_from_elem(elem)
)
def _replace_output_collectors_from_dict(self, output_dict):
self.tool.outputs[DEFAULT_TOOL_OUTPUT].dataset_collector_descriptions = (
output_collection_def.dataset_collector_descriptions_from_output_dict(output_dict)
)
def _append_job_json(self, object, output_path=None, line_type="new_primary_dataset"):
object["type"] = line_type
if output_path:
name = os.path.basename(output_path)
object["filename"] = name
line = json.dumps(object)
with open(os.path.join(self.test_directory, "galaxy.json"), "a") as f:
f.write(f"{line}\n")
def _setup_extra_file(self, **kwargs):
path = kwargs.get("path", None)
filename = kwargs.get("filename", None)
if not path and not filename:
name = kwargs.get("name", DEFAULT_EXTRA_NAME)
visible = kwargs.get("visible", "visible")
ext = kwargs.get("ext", "data")
directory = kwargs.get("directory", self.test_directory)
path = os.path.join(directory, f"primary_{self.hda.id}_{name}_{visible}_{ext}")
if "dbkey" in kwargs:
path = "{}_{}".format(path, kwargs["dbkey"])
if not path:
assert filename
subdir = kwargs.get("subdir", ".")
path = os.path.join(self.test_directory, subdir, filename)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
contents = kwargs.get("contents", "test contents")
open(path, "w").write(contents)
return path
def _setup_test_output(self):
dataset = model.Dataset()
dataset.external_filename = "example_output" # This way object store isn't asked about size...
self.hda = model.HistoryDatasetAssociation(name="test", dataset=dataset)
job = model.Job()
job.add_output_dataset(DEFAULT_TOOL_OUTPUT, self.hda)
self.app.model.context.add(job)
self.job = job
self.history = self._new_history(hdas=[self.hda])
self.job.history = self.history
self.outputs = {DEFAULT_TOOL_OUTPUT: self.hda}
def _new_history(self, hdas=None, flush=True):
hdas = hdas or []
history = model.History()
session = self.app.model.context
session.add(history)
for hda in hdas:
history.add_dataset(hda, set_hid=False)
session.commit()
return history
class MockObjectStore:
def __init__(self):
self.created_datasets = {}
def get_store_by(self, obj, **kwargs):
return "uuid"
def update_from_file(self, dataset, file_name, create):
if create:
self.created_datasets[dataset] = file_name
def size(self, dataset):
path = self.created_datasets[dataset]
return os.stat(path).st_size
def exists(self, *args, **kwargs):
return True
def get_filename(self, dataset, **kwargs):
return self.created_datasets[dataset]
def assert_created_with_path(object_store, dataset, file_name):
assert object_store.created_datasets[dataset] == file_name