from sqlalchemy import select from galaxy.files.unittest_utils import TestPosixConfiguredFileSources from galaxy.model import ( DatasetCollection, DatasetCollectionElement, HistoryDatasetAssociation, HistoryDatasetCollectionAssociation, LibraryDatasetDatasetAssociation, store, ) from galaxy.model.deferred import ( materialize_collection_instance, materializer_factory, ) from galaxy.model.unittest_utils.store_fixtures import ( deferred_hda_model_store_dict, deferred_hda_model_store_dict_space_to_tab, one_ld_library_deferred_model_store_dict, ) from galaxy.util.resources import resource_string from .model.test_model_store import ( perform_import_from_store_dict, setup_fixture_context_with_history, StoreFixtureContextWithHistory, ) from .test_model_copy import _create_hda CONTENTS_2_BED = resource_string(__name__, "model/2.bed") def test_undeferred_hdas_untouched(tmpdir): app, sa_session, user, history = setup_fixture_context_with_history() hda_fh = tmpdir.join("file.txt") hda_fh.write("Moo Cow") hda = _create_hda(sa_session, app.object_store, history, hda_fh, include_metadata_file=False) sa_session.commit() materializer = materializer_factory(True, object_store=app.object_store) assert materializer.ensure_materialized(hda) == hda def test_deferred_hdas_basic_attached(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) _assert_2_bed_metadata(materialized_hda) def test_hash_validate(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" def test_hash_invalid(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() store_dict["datasets"][0]["file_metadata"]["hashes"][0]["hash_value"] = "invalidhash" perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "error" def test_legacy_transform_actions_on_deferred_hdas_become_requested_actions(): # pre 25.1 we didn't have the distinction between requested and applied transforms, # so we need to ensure that legacy transforms are converted to requested transforms for # deferred datasets. In 25.1 - deferred datasets should always have empty transforms as # no actions have been applied yet. fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() store_dict["datasets"][0]["file_metadata"]["sources"][0]["transform"] = [{"action": "spaces_to_tabs"}] perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] deferred_dataset = deferred_hda.dataset assert deferred_dataset.sources[0].transform is None assert deferred_dataset.sources[0].requested_transform == [{"action": "spaces_to_tabs"}] def test_requested_transform_actions_on_deferred_hdas_preserved(): # Continued from previous comment, in 25.1 - deferred datasets should have transforms saved # as requested transforms, so we need to ensure that these are preserved during store import. fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() store_dict["datasets"][0]["file_metadata"]["sources"][0]["requested_transform"] = [{"action": "spaces_to_tabs"}] perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] deferred_dataset = deferred_hda.dataset assert deferred_dataset.sources[0].transform is None assert deferred_dataset.sources[0].requested_transform == [{"action": "spaces_to_tabs"}] def test_hash_validate_source_of_download(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() store_dict["datasets"][0]["file_metadata"]["sources"][0]["hashes"] = [ {"model_class": "DatasetSourceHash", "hash_function": "MD5", "hash_value": "f568c29421792b1b1df4474dafae01f1"} ] perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok", materialized_hda.info def test_hash_invalid_source_of_download(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() store_dict["datasets"][0]["file_metadata"]["sources"][0]["hashes"] = [ {"model_class": "DatasetSourceHash", "hash_function": "MD5", "hash_value": "invalidhash"} ] perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "error", materialized_hda.info def test_deferred_hdas_basic_attached_store_by_uuid(): # skip a flush here so this is a different path... fixture_context = setup_fixture_context_with_history(store_by="uuid") store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) def test_deferred_hdas_basic_detached(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _assert_2_bed_metadata(deferred_hda) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_2_bed(external_filename) _assert_2_bed_metadata(materialized_hda) def test_deferred_datasets_with_legacy_transforms_respect_transform(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict_space_to_tab("legacy") perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" assert deferred_hda.dataset.sources[0].transform is None assert deferred_hda.dataset.sources[0].requested_transform == [{"action": "spaces_to_tabs"}] materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.sources[0].transform == [{"action": "spaces_to_tabs"}] assert materialized_dataset.sources[0].requested_transform == [{"action": "spaces_to_tabs"}] assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_simple_lines_as_tsv(external_filename) def test_deferred_datasets_with_requested_transforms_respect_transform(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict_space_to_tab("25.1") perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" assert deferred_hda.dataset.sources[0].transform is None assert deferred_hda.dataset.sources[0].requested_transform == [ {"action": "datatype_groom"}, {"action": "spaces_to_tabs"}, ] materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.sources[0].transform == [{"action": "spaces_to_tabs"}] assert materialized_dataset.sources[0].requested_transform == [ {"action": "datatype_groom"}, {"action": "spaces_to_tabs"}, ] assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_simple_lines_as_tsv(external_filename) def test_deferred_datasets_do_not_apply_unspecified_transforms_legacy(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict_space_to_tab("legacy", apply_transform=False) perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" assert deferred_hda.dataset.sources[0].transform is None assert deferred_hda.dataset.sources[0].requested_transform == [] materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.sources[0].transform == [] assert materialized_dataset.sources[0].requested_transform == [] assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_simple_lines_as_text(external_filename) def test_deferred_datasets_do_not_apply_unspecified_transforms(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict_space_to_tab("25.1", apply_transform=False) perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" assert deferred_hda.dataset.sources[0].transform is None assert deferred_hda.dataset.sources[0].requested_transform == [{"action": "datatype_groom"}] materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.sources[0].transform == [] assert materialized_dataset.sources[0].requested_transform == [{"action": "datatype_groom"}] assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_simple_lines_as_text(external_filename) def test_deferred_hdas_basic_detached_from_detached_hda(tmpdir): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _ensure_relations_attached_and_expunge(deferred_hda, fixture_context) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(False, transient_directory=tmpdir) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" external_filename = materialized_dataset.external_filename assert external_filename assert external_filename.startswith(str(tmpdir)) _assert_path_contains_2_bed(external_filename) _assert_2_bed_metadata(materialized_hda) def test_deferred_hdas_basic_attached_from_detached_hda(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda _ensure_relations_attached_and_expunge(deferred_hda, fixture_context) assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory( True, object_store=fixture_context.app.object_store, sa_session=fixture_context.sa_session() ) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) _assert_2_bed_metadata(materialized_hda) def test_deferred_ldda_basic_attached(): import_options = store.ImportOptions( allow_library_creation=True, ) fixture_context = setup_fixture_context_with_history() store_dict = one_ld_library_deferred_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict, import_options=import_options) deferred_ldda = fixture_context.sa_session.scalars(select(LibraryDatasetDatasetAssociation)).all()[0] assert deferred_ldda assert deferred_ldda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_ldda) assert materialized_hda.history is None materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) def test_deferred_hdas_basic_attached_file_sources(tmpdir): root = tmpdir / "root" root.mkdir() content_path = root / "2.bed" content_path.write_text(CONTENTS_2_BED, encoding="utf-8") file_sources = TestPosixConfiguredFileSources(str(root)) fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict( source_uri="gxfiles://test1/2.bed", ) perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store, file_sources=file_sources) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) _assert_2_bed_metadata(materialized_hda) def test_deferred_hdas_with_deferred_metadata(): fixture_context = setup_fixture_context_with_history() store_dict = deferred_hda_model_store_dict(metadata_deferred=True) perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] assert deferred_hda assert deferred_hda.dataset.state == "deferred" materializer = materializer_factory(True, object_store=fixture_context.app.object_store) materialized_hda = materializer.ensure_materialized(deferred_hda) materialized_dataset = materialized_hda.dataset assert not materialized_hda.metadata_deferred assert materialized_dataset.state == "ok" # only detached datasets would be created with an external_filename assert not materialized_dataset.external_filename object_store = fixture_context.app.object_store path = object_store.get_filename(materialized_dataset) assert path _assert_path_contains_2_bed(path) _assert_2_bed_metadata(materialized_hda) def test_materialize_attached_hdcas_unimplemented(tmpdir): fixture_context = setup_fixture_context_with_history() materializer = materializer_factory(True, object_store=fixture_context.app.object_store) hdca = _test_hdca(tmpdir, fixture_context) exception_found = False try: materialize_collection_instance(hdca, materializer) except NotImplementedError: exception_found = True assert exception_found def test_materialize_unattached_undeferred_hdcas_noop(tmpdir): fixture_context = setup_fixture_context_with_history() materializer = materializer_factory(False, transient_directory=tmpdir) input_hdca = _test_hdca(tmpdir, fixture_context, include_element_deferred=False) materialized_hdca = materialize_collection_instance(input_hdca, materializer) assert input_hdca == materialized_hdca # doesn't have deferred data so just assert it is input. def test_materialize_unattached_deferred_hdcas(tmpdir): fixture_context = setup_fixture_context_with_history() materializer = materializer_factory(False, transient_directory=tmpdir) deferred_hdca = _test_hdca(tmpdir, fixture_context) assert deferred_hdca.has_deferred_data assert len(deferred_hdca.collection.elements) == 2 assert _deferred_element_count(deferred_hdca.collection) == 1 materialized_hdca = materialize_collection_instance(deferred_hdca, materializer) assert not materialized_hdca.has_deferred_data assert materialized_hdca.name == deferred_hdca.name materialized_collection = materialized_hdca.collection assert materialized_collection materialized_elements = materialized_collection.elements assert len(materialized_elements) == 2 assert _deferred_element_count(materialized_collection) == 0 def _test_hdca( tmpdir, fixture_context: StoreFixtureContextWithHistory, include_element_deferred: bool = True, include_element_ok: bool = True, ) -> HistoryDatasetCollectionAssociation: app, sa_session, _, history = fixture_context store_dict = deferred_hda_model_store_dict() perform_import_from_store_dict(fixture_context, store_dict) deferred_hda = fixture_context.history.datasets[0] sa_session.add(deferred_hda) assert deferred_hda.dataset.state == "deferred" hda_fh = tmpdir.join("file.txt") hda_fh.write("Moo Cow") hda = _create_hda(sa_session, app.object_store, history, hda_fh, include_metadata_file=False) elements = [] element_index = 0 if include_element_deferred: elements.append( DatasetCollectionElement( element=deferred_hda, element_identifier="deferred_hda", element_index=element_index, ) ) element_index += 1 if include_element_ok: elements.append( DatasetCollectionElement( element=hda, element_identifier="ok_hda", element_index=element_index, ) ) element_index += 1 collection = DatasetCollection(collection_type="list", populated=True) collection.elements = elements hdca = HistoryDatasetCollectionAssociation( history=history, hid=1, collection=collection, name="HistoryCollectionTest1", ) sa_session.add(hdca) sa_session.add(collection) sa_session.commit() return hdca def _deferred_element_count(dataset_collection: DatasetCollection) -> int: count = 0 for element in dataset_collection.elements: if element.is_collection: assert element.child_collection count += _deferred_element_count(element.child_collection) else: dataset_instance = element.dataset_instance print(dataset_instance.dataset.state) if dataset_instance.dataset.state == "deferred": count += 1 return count def _ensure_relations_attached_and_expunge(deferred_hda: HistoryDatasetAssociation, fixture_context) -> None: # make sure everything needed is in session (sources, hashes, and metadata)... # point here is exercise deferred_hda.history throws a detached error. [s.hashes for s in deferred_hda.dataset.sources] deferred_hda.dataset.hashes # noqa: B018 deferred_hda._metadata # noqa: B018 sa_session = fixture_context.sa_session sa_session.expunge_all() def _assert_2_bed_metadata(hda: HistoryDatasetAssociation) -> None: assert hda.metadata.columns == 6 assert hda.metadata.data_lines == 68 assert hda.metadata.comment_lines == 0 assert hda.metadata.chromCol == 1 assert hda.metadata.startCol == 2 assert hda.metadata.endCol == 3 assert hda.metadata.viz_filter_cols == [4] def _assert_path_contains_2_bed(path) -> None: with open(path) as f: contents = f.read() assert contents == CONTENTS_2_BED def _assert_path_contains_simple_lines_as_tsv(path) -> None: with open(path) as f: contents = f.read() assert contents == "This\tis\ta\tline\tof\ttext.\n" # simple lines as TSV def _assert_path_contains_simple_lines_as_text(path) -> None: with open(path) as f: contents = f.read() assert contents == "This is a line of text.\n" # simple lines as text