import os import tempfile from contextlib import contextmanager from unittest import mock from galaxy import model from galaxy.managers.jobs import JobManager from galaxy.managers.markdown_util import ( ready_galaxy_markdown_for_export, to_basic_markdown, ) from galaxy.model.orm.now import now from .base import BaseTestCase class BaseExportTestCase(BaseTestCase): def setUp(self): super().setUp() self.app.hda_manager = mock.MagicMock() self.app.workflow_manager = mock.MagicMock() self.app.history_manager = mock.MagicMock() self.app.dataset_collection_manager = mock.MagicMock() def _new_history(self): history = model.History() history.id = 1 history.name = "New History" return history def _new_hda(self, contents=None): hda = model.HistoryDatasetAssociation() hda.id = 1 if contents is not None: hda.dataset = mock.MagicMock() hda.dataset.purged = False t = tempfile.NamedTemporaryFile(mode="w", delete=False) t.write(contents) hda.dataset.get_file_name.return_value = t.name return hda def _new_invocation(self): invocation = model.WorkflowInvocation() invocation.id = 1 invocation.create_time = now() return invocation @contextmanager def _expect_get_history(self, history): self.app.history_manager.get_accessible.return_value = history yield self.app.history_manager.get_accessible.assert_called_once_with(history.id, self.trans.user) @contextmanager def _expect_get_hda(self, hda, hda_id=1): self.app.hda_manager.get_accessible.return_value = hda yield self.app.hda_manager.get_accessible.assert_called_once_with(hda.id, self.trans.user) def _new_pair_collection(self): hda_forward = self._new_hda(contents="Forward dataset.") hda_forward.id = 1 hda_forward.extension = "txt" hda_reverse = self._new_hda(contents="Reverse dataset.") hda_reverse.id = 2 hda_reverse.extension = "txt" collection = model.DatasetCollection() collection.id = 1 element_forward = model.DatasetCollectionElement( collection=collection, element=hda_forward, element_index=0, element_identifier="forward", ) element_forward.id = 1 element_reverse = model.DatasetCollectionElement( collection=collection, element=hda_reverse, element_index=0, element_identifier="reverse", ) element_reverse.id = 2 collection.collection_type = "paired" return collection class TestToBasicMarkdown(BaseExportTestCase): def setUp(self): super().setUp() self.test_dataset_path = None def tearDown(self): super().tearDown() if self.test_dataset_path is not None: os.remove(self.test_dataset_path) def test_noop_on_non_galaxy_blocks(self): example = """# Example ## Some Syntax *Foo* **bar** [Google](http://google.com/). ## Code Blocks ``` history_dataset_display(history_dataset_id=4) ``` Another kind of code block: job_metrics(job_id=4) """ result = self._to_basic(example) assert result == example def test_history_dataset_peek(self): hda = self._new_hda() hda.peek = "My Cool Peek" example = """# Example ```galaxy history_dataset_peek(history_dataset_id=1) ``` """ with self._expect_get_hda(hda): result = self._to_basic(example) assert "\n My Cool Peek\n\n" in result def test_history_dataset_peek_empty(self): hda = self._new_hda() example = """# Example ```galaxy history_dataset_peek(history_dataset_id=1) ``` """ with self._expect_get_hda(hda): result = self._to_basic(example) assert "\n*No Dataset Peek Available*\n" in result def test_history_link(self): history = self._new_history() example = """# Example ```galaxy history_link(history_id=1) ``` """ with self._expect_get_history(history): result = self._to_basic(example) assert "\n New History\n\n" in result def test_history_display_binary(self): hda = self._new_hda() hda.extension = "ab1" example = """# Example ```galaxy history_dataset_display(history_dataset_id=1) ``` """ with self._expect_get_hda(hda): result = self._to_basic(example) assert "**Contents:**\n*cannot display binary content*\n" in result def test_history_display_text(self): hda = self._new_hda(contents="MooCow") hda.extension = "txt" example = """# Example ```galaxy history_dataset_display(history_dataset_id=1) ``` """ with self._expect_get_hda(hda): result = self._to_basic(example) assert "**Contents:**\n\n MooCow\n\n" in result def test_history_display_gtf(self): gtf = """chr13 Cufflinks transcript 3405463 3405542 1000 . . gene_id "CUFF.50189"; transcript_id "CUFF.50189.1"; FPKM "6.3668918357"; frac "1.000000"; conf_lo "0.000000"; conf_hi "17.963819"; cov "0.406914"; chr13 Cufflinks exon 3405463 3405542 1000 . . gene_id "CUFF.50189"; transcript_id "CUFF.50189.1"; exon_number "1"; FPKM "6.3668918357"; frac "1.000000"; conf_lo "0.000000"; conf_hi "17.963819"; cov "0.406914"; chr13 Cufflinks transcript 3473337 3473372 1000 . . gene_id "CUFF.50191"; transcript_id "CUFF.50191.1"; FPKM "11.7350749444"; frac "1.000000"; conf_lo "0.000000"; conf_hi "35.205225"; cov "0.750000"; """ example = """# Example ```galaxy history_dataset_display(history_dataset_id=1) ``` """ hda = self._new_hda(contents=gtf) hda.extension = "gtf" from galaxy.datatypes.tabular import Tabular assert isinstance(hda.datatype, Tabular) with self._expect_get_hda(hda): result = self._to_basic(example) assert " Plot |" in result assert "| Input Dataset | " in result assert "| 5: Cool Data |\n" in result def test_job_metrics(self): job = model.Job() job.id = 1 example = """# Example ```galaxy job_metrics(job_id=1) ``` """ metrics = [ {"plugin": "core", "title": "Cores Allocated", "value": 1}, {"plugin": "core", "title": "Job Start Time", "value": "2019-12-17 11:53:13"}, {"plugin": "env", "title": "GALAXY_HOME", "value": "/path/to/home"}, ] with mock.patch.object(JobManager, "get_accessible_job", return_value=job): with mock.patch("galaxy.managers.markdown_util.summarize_job_metrics", return_value=metrics): result = self._to_basic(example) assert "**core**\n" in result assert "**env**\n" in result assert "| Cores Allocated | 1 |\n" in result assert "| GALAXY_HOME | /path/to/home |\n" in result def _to_basic(self, example): return to_basic_markdown(self.trans, example) class TestReadyExport(BaseExportTestCase): def test_ready_dataset_display(self): hda = self._new_hda() example = """ ```galaxy history_dataset_display(history_dataset_id=1) ``` """ with self._expect_get_hda(hda): _, export_markdown, extra_data = self._ready_export(example) assert "history_datasets" in extra_data assert len(extra_data["history_datasets"]) == 1 def test_ready_export_two_datasets(self): hda = self._new_hda() hda2 = self._new_hda() hda2.id = 2 example = """ ```galaxy history_dataset_display(history_dataset_id=1) ``` ```galaxy history_dataset_display(history_dataset_id=2) ``` """ self.app.hda_manager.get_accessible.side_effect = [hda, hda2] _, export_markdown, extra_data = self._ready_export(example) assert "history_datasets" in extra_data assert len(extra_data["history_datasets"]) == 2 def test_export_dataset_collection_paired(self): hdca = model.HistoryDatasetCollectionAssociation() hdca.name = "cool name" hdca.collection = self._new_pair_collection() hdca.id = 1 hdca.history_id = 1 hdca.collection_id = hdca.collection.id self.trans.app.dataset_collection_manager.get_dataset_collection_instance.return_value = hdca example = """# Example ```galaxy history_dataset_collection_display(history_dataset_collection_id=1) ``` """ # Mock the HDCASerializer to return a dummy serialized view from galaxy.managers.hdcas import HDCASerializer mock_hdca_view = {"id": "encoded_id_1", "name": "cool name", "collection_type": "paired"} with ( mock.patch.object(HDCASerializer, "url_for", return_value="http://google.com"), mock.patch.object(HDCASerializer, "serialize_to_view", return_value=mock_hdca_view), mock.patch.object(HDCASerializer, "serialize_store_times_summary", return_value=[]), ): _, export, extra_data = self._ready_export(example) assert "history_dataset_collections" in extra_data assert len(extra_data.get("history_dataset_collections")) == 1 def test_galaxy_version(self): example = """# Example ```galaxy generate_galaxy_version() ``` """ _, result, extra_data = self._ready_export(example) assert "generate_version" in extra_data assert extra_data["generate_version"] == "19.09" def test_generate_time(self): example = """# Example ```galaxy generate_time() ``` """ _, result, extra_data = self._ready_export(example) assert "generate_time" in extra_data def test_get_invocation_time(self): invocation = self._new_invocation() self.app.workflow_manager.get_invocation.side_effect = [invocation] example = """# Example ```galaxy invocation_time(invocation_id=1) ``` """ _, result, extra_data = self._ready_export(example) assert "invocations" in extra_data assert "create_time" in extra_data["invocations"]["be8be0fd2ce547f6"] assert extra_data["invocations"]["be8be0fd2ce547f6"]["create_time"] == invocation.create_time.isoformat() def test_export_replaces_embedded_history_dataset_type(self): hda = self._new_hda() hda.extension = "fasta" hda2 = self._new_hda() hda2.extension = "fastqsanger" hda2.id = 2 example = """ I ran a cool analysis with two inputs of types ${galaxy history_dataset_type(history_dataset_id=1)} and ${galaxy history_dataset_type(history_dataset_id=2)}. """ self.app.hda_manager.get_accessible.side_effect = [hda, hda2] _, export_markdown, _ = self._ready_export(example) assert export_markdown == """ I ran a cool analysis with two inputs of types fasta and fastqsanger. """ def test_export_replaces_embedded_history_dataset_name(self): hda = self._new_hda() hda.name = "foo bar" hda2 = self._new_hda() hda2.name = "cow dog" hda2.id = 2 example = """ I ran a cool analysis with two inputs of types ${galaxy history_dataset_name(history_dataset_id=1)} and ${galaxy history_dataset_name(history_dataset_id=2)}. """ self.app.hda_manager.get_accessible.side_effect = [hda, hda2] _, export_markdown, _ = self._ready_export(example) assert export_markdown == """ I ran a cool analysis with two inputs of types foo bar and cow dog. """ def test_export_replaces_embedded_generate_time(self): example = """ I ran a cool analysis at ${galaxy generate_time()}. """ _, export_markdown, _ = self._ready_export(example) assert export_markdown.startswith(""" I ran a cool analysis at 2""") def test_export_replaces_embedded_invocation_time(self): invocation = self._new_invocation() self.app.workflow_manager.get_invocation.side_effect = [invocation] example = """ I ran a cool analysis at ${galaxy invocation_time(invocation_id=1)}. """ _, export_markdown, _ = self._ready_export(example) assert export_markdown.startswith(""" I ran a cool analysis at 2""") def test_export_replaces_embedded_galaxy_version(self): example = """ I ran a cool analysis with Galaxy ${galaxy generate_galaxy_version()}. """ _, export_markdown, _ = self._ready_export(example) assert export_markdown == """ I ran a cool analysis with Galaxy 19.09. """ def test_export_replaces_embedded_access_link(self): self.trans.app.config.instance_access_url = "http://mycoolgalaxy.org" example = """ I ran a cool analysis at ${galaxy instance_access_link()}. """ _, export_markdown, _ = self._ready_export(example) assert export_markdown == """ I ran a cool analysis at [http://mycoolgalaxy.org](http://mycoolgalaxy.org). """ def _ready_export(self, example: str): return ready_galaxy_markdown_for_export(self.trans, example)