import os import tarfile import tempfile from collections.abc import Iterator from pathlib import Path from tempfile import NamedTemporaryFile from typing import ( Optional, Union, ) import requests from typing_extensions import Protocol from galaxy.util.resources import ( as_file, resource_path, Traversable, ) from galaxy_test.base import api_asserts from galaxy_test.base.api_util import random_name from tool_shed_client.schema import ( BuildSearchIndexResponse, Category, CreateCategoryRequest, CreateRepositoryRequest, DetailedRepository, from_legacy_install_info, GetInstallInfoRequest, GetOrderedInstallableRevisionsRequest, InstallInfo, OrderedInstallableRevisions, PaginatedRepositoryIndexResults, RepositoriesByCategory, Repository, RepositoryIndexRequest, RepositoryIndexResponse, RepositoryMetadata, RepositoryPaginatedIndexRequest, RepositorySearchRequest, RepositorySearchResults, RepositoryUpdate, RepositoryUpdateRequest, ResetMetadataOnRepositoryRequest, ResetMetadataOnRepositoryResponse, ToolSearchRequest, ToolSearchResults, UpdateRepositoryRequest, Version, ) from .api_util import ( ensure_user_with_email, ShedApiInteractor, ) HasRepositoryId = Union[str, Repository] DEFAULT_PREFIX = "repofortest" TEST_DATA_REPO_FILES = resource_path(__name__, "../test_data") COLUMN_MAKER_PATH = TEST_DATA_REPO_FILES.joinpath("column_maker/column_maker.tar") COLUMN_MAKER_1_1_1_PATH = TEST_DATA_REPO_FILES.joinpath("column_maker/column_maker_1.1.1.tar") DEFAULT_COMMIT_MESSAGE = "a test commit message" def repo_files(test_data_path: str) -> Iterator[Path]: repos = TEST_DATA_REPO_FILES.joinpath(f"repos/{test_data_path}") for child in sorted(_.name for _ in repos.iterdir()): with as_file(repos.joinpath(child)) as path: yield path def repo_tars(test_data_path: str) -> Iterator[Path]: for path in repo_files(test_data_path): assert path.is_dir() prefix = f"shedtest_{test_data_path}_{path.name}_" with NamedTemporaryFile(prefix=prefix) as tf: with tarfile.open(tf.name, "w:gz") as tar: tar.add(str(path.absolute()), arcname=test_data_path or path.name) yield Path(tf.name) class HostsTestToolShed(Protocol): host: str port: Optional[str] class ToolShedPopulator: """Utilities for easy fixture creation of tool shed related things.""" _admin_api_interactor: ShedApiInteractor _api_interactor: ShedApiInteractor def __init__(self, admin_api_interactor: ShedApiInteractor, api_interactor: ShedApiInteractor): self._admin_api_interactor = admin_api_interactor self._api_interactor = api_interactor def setup_bismark_repo( self, repository_id: Optional[HasRepositoryId] = None, end: Optional[int] = None, category_id: Optional[str] = None, ) -> HasRepositoryId: if repository_id is None: category_id = category_id or self.new_category(prefix="testbismark").id repository_id = self.new_repository(category_id, prefix="testbismark") return self.setup_test_data_repo_by_id("bismark", repository_id, assert_ok=False, end=end) def setup_test_data_repo_by_id( self, test_data_path: str, repository_id: Optional[HasRepositoryId] = None, assert_ok=True, start: int = 0, end: Optional[int] = None, ) -> HasRepositoryId: if repository_id is None: prefix = test_data_path.replace("_", "") category_id = self.new_category(prefix=prefix).id repository = self.new_repository(category_id, prefix=prefix) repository_id = repository.id assert repository_id for index, repo_tar in enumerate(repo_tars(test_data_path)): if index < start: continue if end and index >= end: break commit_message = f"Updating {test_data_path} with index {index} with tar {repo_tar}" response = self.upload_revision_raw(repository_id, repo_tar, commit_message) if assert_ok: api_asserts.assert_status_code_is_ok(response) assert RepositoryUpdate(root=response.json()).is_ok return repository_id def setup_test_data_repo( self, test_data_path: str, repository: Optional[Repository] = None, assert_ok=True, start: int = 0, end: Optional[int] = None, category_id: Optional[str] = None, ) -> Repository: if repository is None: prefix = test_data_path.replace("_", "") if category_id is None: category_id = self.new_category(prefix=prefix).id repository = self.new_repository(category_id, prefix=prefix) self.setup_test_data_repo_by_id(test_data_path, repository, assert_ok=assert_ok, start=start, end=end) return repository def setup_column_maker_repo( self, prefix=DEFAULT_PREFIX, category_id: Optional[str] = None, ) -> Repository: if category_id is None: category_id = self.new_category(prefix=prefix).id assert category_id repository = self.new_repository(category_id, prefix=prefix) repository_id = repository.id assert repository_id response = self.upload_revision( repository_id, COLUMN_MAKER_PATH, ) assert response.is_ok return repository def setup_column_maker_and_get_metadata(self, prefix=DEFAULT_PREFIX) -> RepositoryMetadata: repository = self.setup_column_maker_repo(prefix=prefix) return self.get_metadata(repository) def get_install_info_for_repository(self, has_repository_id: HasRepositoryId) -> InstallInfo: repository_id = self._repository_id(has_repository_id) metadata = self.get_metadata(repository_id, True) return self.get_install_info(metadata) def get_install_info(self, repository_metadata: RepositoryMetadata) -> InstallInfo: revision_metadata = repository_metadata.latest_revision repo = revision_metadata.repository request = GetInstallInfoRequest( owner=repo.owner, name=repo.name, changeset_revision=revision_metadata.changeset_revision, ) revisions_response = self._api_interactor.get( "repositories/get_repository_revision_install_info", params=request.model_dump() ) api_asserts.assert_status_code_is_ok(revisions_response) return from_legacy_install_info(revisions_response.json()) def update_column_maker_repo(self, repository: HasRepositoryId) -> RepositoryUpdate: response = self.upload_revision( repository, COLUMN_MAKER_1_1_1_PATH, ) return response def upload_revision_raw( self, repository: HasRepositoryId, path: Traversable, commit_message: str = DEFAULT_COMMIT_MESSAGE ) -> requests.Response: body = RepositoryUpdateRequest( commit_message=commit_message, ) files = {"file": path.open("rb")} repository_id = self._repository_id(repository) response = self._api_interactor.post( f"repositories/{repository_id}/changeset_revision", params=body.model_dump(), files=files ) return response def update_raw(self, repository: HasRepositoryId, request: UpdateRepositoryRequest) -> requests.Response: repository_id = self._repository_id(repository) body_json = request.model_dump(exclude_unset=True, by_alias=True) put_response = self._api_interactor.put(f"repositories/{repository_id}", json=body_json) return put_response def update(self, repository: HasRepositoryId, request: UpdateRepositoryRequest) -> Repository: response = self.update_raw(repository, request) api_asserts.assert_status_code_is_ok(response) return Repository(**response.json()) def upload_revision( self, repository: HasRepositoryId, path: Traversable, commit_message: str = DEFAULT_COMMIT_MESSAGE ) -> RepositoryUpdate: response = self.upload_revision_raw(repository, path, commit_message=commit_message) if response.status_code != 200: response_json = None err_msg = None try: response_json = response.json() except Exception: pass if response_json and "err_msg" in response_json: err_msg = response_json["err_msg"] if err_msg and "No changes" in err_msg: assert_msg = f"Updating repository [{repository}] with path [{path}] and commit_message {commit_message} failed to update repository contents, no changes found. Response: [{response_json}]" raise AssertionError(assert_msg) api_asserts.assert_status_code_is_ok(response) return RepositoryUpdate(root=response.json()) def new_repository(self, category_ids: Union[list[str], str], prefix: str = DEFAULT_PREFIX) -> Repository: name = random_name(prefix=prefix) synopsis = random_name(prefix=prefix) request = CreateRepositoryRequest( name=name, synopsis=synopsis, category_ids=category_ids, ) return self.create_repository(request) def create_repository(self, request: CreateRepositoryRequest) -> Repository: response = self._api_interactor.post("repositories", json=request.model_dump(by_alias=True)) api_asserts.assert_status_code_is_ok(response) return Repository(**response.json()) def reindex(self) -> BuildSearchIndexResponse: index_response = self._admin_api_interactor.put("tools/build_search_index") index_response.raise_for_status() return BuildSearchIndexResponse(**index_response.json()) def new_category( self, name: Optional[str] = None, description: Optional[str] = None, prefix=DEFAULT_PREFIX ) -> Category: category_name = name or random_name(prefix=prefix) category_description = description or "testcreaterepo" request = CreateCategoryRequest(name=category_name, description=category_description) response = self._admin_api_interactor.post("categories", json=request.model_dump()) response.raise_for_status() return Category(**response.json()) def get_categories(self) -> list[Category]: response = self._api_interactor.get("categories") response.raise_for_status() return [Category(**c) for c in response.json()] def get_category_with_id(self, category_id: str) -> Category: response = self._api_interactor.get(f"categories/{category_id}") response.raise_for_status() return Category(**response.json()) def get_category_with_name(self, name: str) -> Category: categories = [c for c in self.get_categories() if c.name == name] if not categories: raise ValueError(f"No category with name {name} found.") return categories[0] def repositories_by_category(self, category_id: str) -> RepositoriesByCategory: response = self._api_interactor.get(f"categories/{category_id}/repositories") response.raise_for_status() return RepositoriesByCategory(**response.json()) def assert_category_has_n_repositories(self, category_id: str, n: int): category_repos = self.repositories_by_category(category_id) assert category_repos.repository_count == n assert len(category_repos.repositories) == n def get_ordered_installable_revisions(self, owner: str, name: str) -> OrderedInstallableRevisions: request = GetOrderedInstallableRevisionsRequest(owner=owner, name=name) revisions_response = self._api_interactor.get( "repositories/get_ordered_installable_revisions", params=request.model_dump() ) api_asserts.assert_status_code_is_ok(revisions_response) return OrderedInstallableRevisions(root=revisions_response.json()) def assert_has_n_installable_revisions(self, repository: Repository, n: int): revisions = self.get_ordered_installable_revisions(repository.owner, repository.name) actual_n = len(revisions.root) assert actual_n == n, f"Expected {n} repository revisions, found {actual_n} for {repository}" def get_repository_for(self, owner: str, name: str, deleted: str = "false") -> Optional[Repository]: request = RepositoryIndexRequest( owner=owner, name=name, deleted=deleted, ) index = self.repository_index(request) return index.root[0] if index.root else None def repository_index(self, request: Optional[RepositoryIndexRequest]) -> RepositoryIndexResponse: repository_response = self._api_interactor.get("repositories", params=(request.model_dump() if request else {})) api_asserts.assert_status_code_is_ok(repository_response) return RepositoryIndexResponse(root=repository_response.json()) def repository_index_paginated( self, request: Optional[RepositoryPaginatedIndexRequest] ) -> PaginatedRepositoryIndexResults: repository_response = self._api_interactor.get( "repositories", params=(request.model_dump() if request else {"page": 1}) ) api_asserts.assert_status_code_is_ok(repository_response) return PaginatedRepositoryIndexResults(**repository_response.json()) def get_usernames_allowed_to_push(self, repository: HasRepositoryId) -> list[str]: repository_id = self._repository_id(repository) show_response = self._api_interactor.get(f"repositories/{repository_id}/allow_push") show_response.raise_for_status() as_list = show_response.json() assert isinstance(as_list, list) return as_list def allow_user_to_push(self, repository: HasRepositoryId, username: str) -> None: repository_id = self._repository_id(repository) post_response = self._api_interactor.post(f"repositories/{repository_id}/allow_push/{username}") post_response.raise_for_status() def disallow_user_to_push(self, repository: HasRepositoryId, username: str) -> None: repository_id = self._repository_id(repository) delete_response = self._api_interactor.delete(f"repositories/{repository_id}/allow_push/{username}") delete_response.raise_for_status() def get_admin_users(self, repository: HasRepositoryId) -> list[str]: repository_id = self._repository_id(repository) response = self._api_interactor.get(f"repositories/{repository_id}/admins") response.raise_for_status() as_list = response.json() assert isinstance(as_list, list) return as_list def add_admin_user(self, repository: HasRepositoryId, username: str) -> None: repository_id = self._repository_id(repository) response = self._api_interactor.post(f"repositories/{repository_id}/admins/{username}") response.raise_for_status() def remove_admin_user(self, repository: HasRepositoryId, username: str) -> None: repository_id = self._repository_id(repository) response = self._api_interactor.delete(f"repositories/{repository_id}/admins/{username}") response.raise_for_status() def add_admin_user_raw(self, repository: HasRepositoryId, username: str) -> requests.Response: repository_id = self._repository_id(repository) return self._api_interactor.post(f"repositories/{repository_id}/admins/{username}") def remove_admin_user_raw(self, repository: HasRepositoryId, username: str) -> requests.Response: repository_id = self._repository_id(repository) return self._api_interactor.delete(f"repositories/{repository_id}/admins/{username}") def get_admin_users_raw(self, repository: HasRepositoryId) -> requests.Response: repository_id = self._repository_id(repository) return self._api_interactor.get(f"repositories/{repository_id}/admins") def set_malicious(self, repository: HasRepositoryId, changeset_revision: str): repository_id = self._repository_id(repository) put_response = self._api_interactor.put( f"repositories/{repository_id}/revisions/{changeset_revision}/malicious" ) put_response.raise_for_status() def unset_malicious(self, repository: HasRepositoryId, changeset_revision: str): repository_id = self._repository_id(repository) delete_response = self._api_interactor.delete( f"repositories/{repository_id}/revisions/{changeset_revision}/malicious" ) delete_response.raise_for_status() def tip_is_malicious(self, repository: HasRepositoryId) -> bool: repository_metadata = self.get_metadata(repository) revision = repository_metadata.latest_revision return revision.malicious def set_deprecated(self, repository: HasRepositoryId): repository_id = self._repository_id(repository) put_response = self._api_interactor.put(f"repositories/{repository_id}/deprecated") put_response.raise_for_status() def unset_deprecated(self, repository: HasRepositoryId): repository_id = self._repository_id(repository) delete_response = self._api_interactor.delete(f"repositories/{repository_id}/deprecated") delete_response.raise_for_status() def get_repository(self, repository: HasRepositoryId) -> DetailedRepository: repository_id = self._repository_id(repository) repository_response = self._api_interactor.get(f"repositories/{repository_id}") repository_response.raise_for_status() return DetailedRepository(**repository_response.json()) def is_deprecated(self, repository: HasRepositoryId) -> bool: return self.get_repository(repository).deprecated def get_metadata(self, repository: HasRepositoryId, downloadable_only=True) -> RepositoryMetadata: repository_id = self._repository_id(repository) metadata_response = self._api_interactor.get( f"repositories/{repository_id}/metadata?downloadable_only={downloadable_only}" ) api_asserts.assert_status_code_is_ok(metadata_response) return RepositoryMetadata(root=metadata_response.json()) def reset_metadata( self, repository: HasRepositoryId, dry_run: bool = False, verbose: bool = False ) -> ResetMetadataOnRepositoryResponse: repository_id = self._repository_id(repository) request = ResetMetadataOnRepositoryRequest(repository_id=repository_id, dry_run=dry_run, verbose=verbose) reset_response = self._api_interactor.post( "repositories/reset_metadata_on_repository", json=request.model_dump() ) api_asserts.assert_status_code_is_ok(reset_response) return ResetMetadataOnRepositoryResponse(**reset_response.json()) def version(self) -> Version: version_response = self._admin_api_interactor.get("version") api_asserts.assert_status_code_is_ok(version_response) return Version(**version_response.json()) def tool_search_query(self, query: str) -> ToolSearchResults: return self.tool_search(ToolSearchRequest(q=query)) def tool_search(self, search_request: ToolSearchRequest) -> ToolSearchResults: search_response = self._api_interactor.get("tools", params=search_request.model_dump()) api_asserts.assert_status_code_is_ok(search_response) return ToolSearchResults(**search_response.json()) def tool_guid( self, shed_host: HostsTestToolShed, repository: Repository, tool_id: str, tool_version: Optional[str] = None ) -> str: owner = repository.owner name = repository.name port = shed_host.port if port in [None, "80", "443"]: host_and_port = shed_host.host else: host_and_port = f"{shed_host.host}:{shed_host.port}" tool_id_base = f"{host_and_port}/repos/{owner}/{name}/{tool_id}" if tool_version is None: return tool_id_base else: return f"{tool_id_base}/{tool_version}" def repo_search_query(self, query: str) -> RepositorySearchResults: return self.repo_search(RepositorySearchRequest(q=query)) def repo_search(self, repo_search_request: RepositorySearchRequest) -> RepositorySearchResults: search_response = self._api_interactor.get("repositories", params=repo_search_request.model_dump()) api_asserts.assert_status_code_is_ok(search_response) return RepositorySearchResults(**search_response.json()) def delete_api_key(self) -> None: response = self._api_interactor.delete("users/current/api_key") response.raise_for_status() def create_new_api_key(self) -> str: response = self._api_interactor.post("users/current/api_key") response.raise_for_status() return response.json() def guid(self, repository: Repository, tool_id: str, tool_version: str) -> str: url = self._api_interactor.url base = url.split("://")[1].split("/")[0] return f"{base}/repos/{repository.owner}/{repository.name}/{tool_id}/{tool_version}" def new_user(self, username: str, password: str): return ensure_user_with_email(self._admin_api_interactor, username, password) def get_tip_changeset(self, repository: Repository) -> str: """Get the tip changeset revision for a repository.""" revisions = self.get_ordered_installable_revisions(repository.owner, repository.name) assert revisions.root, f"No installable revisions for {repository.name}" return revisions.root[-1] def create_repository_dependency( self, repository: Repository, dependency_tuples: list[tuple[str, str, str, str]], prior_installation_required: bool = False, ) -> None: """Upload a repository_dependencies.xml to wire up a dependency. Each tuple in dependency_tuples is (shed_url, name, owner, changeset_revision). """ prior_attr = ' prior_installation_required="True"' if prior_installation_required else "" dep_lines = [] for shed_url, name, owner, changeset in dependency_tuples: dep_lines.append( f' ' ) xml = '\n\n' + "\n".join(dep_lines) + "\n\n" tmpdir = tempfile.mkdtemp(prefix="repo_dep_") xml_path = os.path.join(tmpdir, "repository_dependencies.xml") with open(xml_path, "w") as f: f.write(xml) tar_path = os.path.join(tmpdir, "repo_dep.tar.gz") with tarfile.open(tar_path, "w:gz") as tar: tar.add(xml_path, arcname="repository_dependencies.xml") response = self.upload_revision_raw(repository, Path(tar_path), commit_message="Add repository dependency") api_asserts.assert_status_code_is_ok(response) def _repository_id(self, has_id: HasRepositoryId) -> str: if isinstance(has_id, Repository): return has_id.id else: return str(has_id)