import os import re from typing import Any from unicodedata import normalize from mdrsclient.api import DoiApi, FilesApi, FoldersApi, LaboratoriesApi, UsersApi from mdrsclient.cache import CacheInterface from mdrsclient.config import ConfigFile from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import ( IllegalArgumentException, MissingConfigurationException, UnauthorizedException, UnexpectedException, ) from mdrsclient.models import File, Folder, Laboratory, Token, User from mdrsclient.utils import page_num_from_url class MdrsService: def __init__(self, connection: MDRSConnection): self.connection = connection @classmethod def create_connection(cls, remote: str, cache: CacheInterface | None = None) -> MDRSConnection: config = ConfigFile(remote) if config.url is None: raise MissingConfigurationException(f"Remote host `{remote}` is not found.") return MDRSConnection(config.remote, config.url, cache=cache) def login(self, username: str, password: str) -> tuple[Token, User]: user_api = UsersApi(self.connection) token = user_api.token(username, password) self.connection.token = token user = user_api.current() self.connection.user = user return token, user def logout(self) -> None: self.connection.logout() def whoami(self) -> User: user_api = UsersApi(self.connection) return user_api.current() def get_laboratories(self) -> list[Laboratory]: laboratory_api = LaboratoriesApi(self.connection) labs = laboratory_api.list() self.connection.laboratories = labs return list(labs) def find_laboratory(self, name: str) -> Laboratory: if self.connection.laboratories.empty() or (self.connection.token and self.connection.token.is_expired): self.get_laboratories() laboratory = self.connection.laboratories.find_by_name(name) if laboratory is None: raise IllegalArgumentException(f"Laboratory `{name}` not found.") return laboratory def find_folder(self, laboratory: Laboratory, path: str, password: str | None = None) -> Folder: folder_api = FoldersApi(self.connection) folders = folder_api.list(laboratory.id, normalize("NFC", path)) if len(folders) != 1: raise UnexpectedException(f"Folder `{path}` not found.") if folders[0].lock: if password is None: raise UnauthorizedException(f"Folder `{path}` is locked.") folder_api.auth(folders[0].id, password) return folder_api.retrieve(folders[0].id) def find_files(self, folder_id: str) -> list[File]: files_api = FilesApi(self.connection) page = 1 results_file = [] while page: result = files_api.list(folder_id, page) results_file.extend(result.results) page = 0 if result.next: page = page_num_from_url(result.next) return results_file @staticmethod def is_doi(path_component: str) -> bool: return path_component.startswith("10.") and "/" in path_component @staticmethod def doi_suffix_id(doi: str) -> str: doi = doi.rstrip("/") slash_pos = doi.find("/") if slash_pos == -1: return doi suffix = doi[slash_pos + 1 :] dot_pos = suffix.rfind(".") return suffix[dot_pos + 1 :] if dot_pos != -1 else suffix @staticmethod def split_doi_and_subpath(doi_with_path: str) -> tuple[str, str]: first_slash = doi_with_path.find("/") if first_slash != -1: after_suffix_start = first_slash + 1 after_first = doi_with_path[after_suffix_start:] second_slash = after_first.find("/") if second_slash != -1: doi_end = after_suffix_start + second_slash doi = doi_with_path[:doi_end] subpath = doi_with_path[doi_end:] if subpath == "/": return (doi, "") else: return (doi, subpath) else: return (doi_with_path, "") else: return (doi_with_path, "") @classmethod def parse_remote_host(cls, path: str) -> str: path_array = path.split(":") remote_host = path_array[0] if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2: raise IllegalArgumentException("Invalid remote host") return remote_host @classmethod def parse_remote_host_with_path(cls, path: str) -> tuple[str, str, str]: path = re.sub(r"//+|/\./+|/\.$", "/", path) if re.search(r"/\.\./|/\.\.$", path) is not None: raise IllegalArgumentException("Path traversal found.") path_array = path.split(":") if len(path_array) != 2: raise IllegalArgumentException("Invalid remote host.") remote_host = path_array[0] folder_array = path_array[1].split("/") is_absolute_path = folder_array[0] == "" if not is_absolute_path: raise IllegalArgumentException("Must be absolute paths.") del folder_array[0] if len(folder_array) == 0: laboratory = "" folder = "" else: laboratory = folder_array.pop(0) folder = "/" + "/".join(folder_array) return (remote_host, laboratory, folder) @classmethod def parse_doi_remote_host(cls, path: str) -> tuple[str, str, str]: parts = path.split(":", 1) if len(parts) != 2: raise IllegalArgumentException("remote_path must be in the form 'remote:10.xxxx/prefix.ID'") remote, doi_with_path = parts if not cls.is_doi(doi_with_path): raise IllegalArgumentException(f"Path `{doi_with_path}` does not look like a DOI.") doi, subpath = cls.split_doi_and_subpath(doi_with_path) return (remote, doi, subpath) def find_folder_by_doi(self, doi: str, password: str | None = None) -> tuple[Folder, Laboratory]: doi_clean = doi.rstrip("/") doi_id = self.doi_suffix_id(doi_clean) doi_api = DoiApi(self.connection) doi_resp = doi_api.retrieve(doi_id) returned_doi = doi_resp.doi.rstrip("/") if returned_doi.lower() != doi_clean.lower(): raise IllegalArgumentException( f"DOI mismatch: requested `{doi_clean}` but server returned `{returned_doi}`." ) folder_api = FoldersApi(self.connection) folder = folder_api.retrieve(doi_resp.folder_id) if folder.lock: if password is None: raise UnauthorizedException(f"Folder for DOI `{doi_clean}` is locked.") folder_api.auth(doi_resp.folder.id, password) lab_api = LaboratoriesApi(self.connection) labs = lab_api.list() lab = labs.find_by_id(folder.laboratory_id) if lab is None: raise UnexpectedException(f"Laboratory with id {folder.laboratory_id} not found.") self.connection.laboratories = labs return (folder, lab) def resolve_folder(self, remote_path: str, password: str | None = None) -> tuple[Folder, Laboratory]: path_component = remote_path.split(":", 1)[1] if ":" in remote_path else "" if self.is_doi(path_component): remote, doi, subpath = self.parse_doi_remote_host(remote_path) doi_folder, laboratory = self.find_folder_by_doi(doi, password) if not subpath: return (doi_folder, laboratory) else: abs_path = doi_folder.path.rstrip("/") + subpath folder = self.find_folder(laboratory, abs_path, password) return (folder, laboratory) else: remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path) laboratory = self.find_laboratory(laboratory_name) folder = self.find_folder(laboratory, r_path, password) return (folder, laboratory) def resolve_file(self, remote_path: str, password: str | None = None) -> tuple[Folder, Laboratory, str]: path_component = remote_path.split(":", 1)[1] if ":" in remote_path else "" if self.is_doi(path_component): remote, doi, subpath = self.parse_doi_remote_host(remote_path) doi_folder, laboratory = self.find_folder_by_doi(doi, password) subpath_clean = subpath.rstrip("/") if not subpath_clean: raise IllegalArgumentException("DOI path must point to a file, not a folder.") r_dirname = os.path.dirname(subpath_clean) r_basename = os.path.basename(subpath_clean) abs_path = doi_folder.path.rstrip("/") + r_dirname parent_folder = self.find_folder(laboratory, abs_path, password) return (parent_folder, laboratory, r_basename) else: remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) laboratory = self.find_laboratory(laboratory_name) parent_folder = self.find_folder(laboratory, r_dirname, password) return (parent_folder, laboratory, r_basename)