Files
mdrs-client-python/mdrsclient/services.py
T
orrisroot 36cad6db52 refactor: extract MdrsClient service layer for library portability
To improve the tool's portability as a Python library, the core logic
has been decoupled from the CLI interface. This allows developers to
programmatically interact with MDRS without relying on CLI-specific
argument parsing or local file-based caches.

- Introduce `MdrsClient` service layer to handle core operations.
- Abstract authentication state using `CacheInterface` and `InMemoryCache`.
- Migrate all CLI commands to utilize `MdrsClient` for execution.
- Separate `Doi` data model from API responses and move to `models/doi.py`.
- Update `README.md` to include Python API usage examples.
- Bump package version to 1.3.17.
2026-07-02 13:07:18 +09:00

224 lines
9.2 KiB
Python

from typing import Any
import os
import re
from unicodedata import normalize
from mdrsclient.api import DoiApi, FilesApi, FoldersApi, LaboratoriesApi, UsersApi
from mdrsclient.config import ConfigFile
from mdrsclient.connection import MDRSConnection
from mdrsclient.exceptions import (
IllegalArgumentException,
MissingConfigurationException,
UnauthorizedException,
UnexpectedException,
)
from mdrsclient.models import File, Folder, Laboratory, Token, User
from mdrsclient.utils import page_num_from_url
class MdrsService:
def __init__(self, connection: MDRSConnection):
self.connection = connection
@classmethod
def create_connection(cls, remote: str) -> MDRSConnection:
config = ConfigFile(remote)
if config.url is None:
raise MissingConfigurationException(f"Remote host `{remote}` is not found.")
return MDRSConnection(config.remote, config.url)
def login(self, username: str, password: str) -> tuple[Token, User]:
user_api = UsersApi(self.connection)
token = user_api.token(username, password)
self.connection.token = token
user = user_api.current()
self.connection.user = user
return token, user
def logout(self) -> None:
self.connection.logout()
def whoami(self) -> User:
user_api = UsersApi(self.connection)
return user_api.current()
def get_laboratories(self) -> list[Laboratory]:
laboratory_api = LaboratoriesApi(self.connection)
labs = laboratory_api.list()
self.connection.laboratories = labs
return list(labs)
def find_laboratory(self, name: str) -> Laboratory:
if self.connection.laboratories.empty() or (self.connection.token and self.connection.token.is_expired):
self.get_laboratories()
laboratory = self.connection.laboratories.find_by_name(name)
if laboratory is None:
raise IllegalArgumentException(f"Laboratory `{name}` not found.")
return laboratory
def find_folder(self, laboratory: Laboratory, path: str, password: str | None = None) -> Folder:
folder_api = FoldersApi(self.connection)
folders = folder_api.list(laboratory.id, normalize("NFC", path))
if len(folders) != 1:
raise UnexpectedException(f"Folder `{path}` not found.")
if folders[0].lock:
if password is None:
raise UnauthorizedException(f"Folder `{path}` is locked.")
folder_api.auth(folders[0].id, password)
return folder_api.retrieve(folders[0].id)
def find_files(self, folder_id: str) -> list[File]:
files_api = FilesApi(self.connection)
page = 1
results_file = []
while page:
result = files_api.list(folder_id, page)
results_file.extend(result.results)
page = 0
if result.next:
page = page_num_from_url(result.next)
return results_file
@staticmethod
def is_doi(path_component: str) -> bool:
return path_component.startswith("10.") and "/" in path_component
@staticmethod
def doi_suffix_id(doi: str) -> str:
doi = doi.rstrip("/")
slash_pos = doi.find("/")
if slash_pos == -1:
return doi
suffix = doi[slash_pos + 1 :]
dot_pos = suffix.rfind(".")
return suffix[dot_pos + 1 :] if dot_pos != -1 else suffix
@staticmethod
def split_doi_and_subpath(doi_with_path: str) -> tuple[str, str]:
first_slash = doi_with_path.find("/")
if first_slash != -1:
after_suffix_start = first_slash + 1
after_first = doi_with_path[after_suffix_start:]
second_slash = after_first.find("/")
if second_slash != -1:
doi_end = after_suffix_start + second_slash
doi = doi_with_path[:doi_end]
subpath = doi_with_path[doi_end:]
if subpath == "/":
return (doi, "")
else:
return (doi, subpath)
else:
return (doi_with_path, "")
else:
return (doi_with_path, "")
@classmethod
def parse_remote_host(cls, path: str) -> str:
path_array = path.split(":")
remote_host = path_array[0]
if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2:
raise IllegalArgumentException("Invalid remote host")
return remote_host
@classmethod
def parse_remote_host_with_path(cls, path: str) -> tuple[str, str, str]:
path = re.sub(r"//+|/\./+|/\.$", "/", path)
if re.search(r"/\.\./|/\.\.$", path) is not None:
raise IllegalArgumentException("Path traversal found.")
path_array = path.split(":")
if len(path_array) != 2:
raise IllegalArgumentException("Invalid remote host.")
remote_host = path_array[0]
folder_array = path_array[1].split("/")
is_absolute_path = folder_array[0] == ""
if not is_absolute_path:
raise IllegalArgumentException("Must be absolute paths.")
del folder_array[0]
if len(folder_array) == 0:
laboratory = ""
folder = ""
else:
laboratory = folder_array.pop(0)
folder = "/" + "/".join(folder_array)
return (remote_host, laboratory, folder)
@classmethod
def parse_doi_remote_host(cls, path: str) -> tuple[str, str, str]:
parts = path.split(":", 1)
if len(parts) != 2:
raise IllegalArgumentException("remote_path must be in the form 'remote:10.xxxx/prefix.ID'")
remote, doi_with_path = parts
if not cls.is_doi(doi_with_path):
raise IllegalArgumentException(f"Path `{doi_with_path}` does not look like a DOI.")
doi, subpath = cls.split_doi_and_subpath(doi_with_path)
return (remote, doi, subpath)
def find_folder_by_doi(self, doi: str, password: str | None = None) -> tuple[Folder, Laboratory]:
doi_clean = doi.rstrip("/")
doi_id = self.doi_suffix_id(doi_clean)
doi_api = DoiApi(self.connection)
doi_resp = doi_api.retrieve(doi_id)
returned_doi = doi_resp.doi.rstrip("/")
if returned_doi.lower() != doi_clean.lower():
raise IllegalArgumentException(
f"DOI mismatch: requested `{doi_clean}` but server returned `{returned_doi}`."
)
folder_api = FoldersApi(self.connection)
folder = folder_api.retrieve(doi_resp.folder_id)
if folder.lock:
if password is None:
raise UnauthorizedException(f"Folder for DOI `{doi_clean}` is locked.")
folder_api.auth(doi_resp.folder.id, password)
lab_api = LaboratoriesApi(self.connection)
labs = lab_api.list()
lab = labs.find_by_id(folder.laboratory_id)
if lab is None:
raise UnexpectedException(f"Laboratory with id {folder.laboratory_id} not found.")
self.connection.laboratories = labs
return (folder, lab)
def resolve_folder(self, remote_path: str, password: str | None = None) -> tuple[Folder, Laboratory]:
path_component = remote_path.split(":", 1)[1] if ":" in remote_path else ""
if self.is_doi(path_component):
remote, doi, subpath = self.parse_doi_remote_host(remote_path)
doi_folder, laboratory = self.find_folder_by_doi(doi, password)
if not subpath:
return (doi_folder, laboratory)
else:
abs_path = doi_folder.path.rstrip("/") + subpath
folder = self.find_folder(laboratory, abs_path, password)
return (folder, laboratory)
else:
remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path)
laboratory = self.find_laboratory(laboratory_name)
folder = self.find_folder(laboratory, r_path, password)
return (folder, laboratory)
def resolve_file(self, remote_path: str, password: str | None = None) -> tuple[Folder, Laboratory, str]:
path_component = remote_path.split(":", 1)[1] if ":" in remote_path else ""
if self.is_doi(path_component):
remote, doi, subpath = self.parse_doi_remote_host(remote_path)
doi_folder, laboratory = self.find_folder_by_doi(doi, password)
subpath_clean = subpath.rstrip("/")
if not subpath_clean:
raise IllegalArgumentException("DOI path must point to a file, not a folder.")
r_dirname = os.path.dirname(subpath_clean)
r_basename = os.path.basename(subpath_clean)
abs_path = doi_folder.path.rstrip("/") + r_dirname
parent_folder = self.find_folder(laboratory, abs_path, password)
return (parent_folder, laboratory, r_basename)
else:
remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path)
r_path = r_path.rstrip("/")
r_dirname = os.path.dirname(r_path)
r_basename = os.path.basename(r_path)
laboratory = self.find_laboratory(laboratory_name)
parent_folder = self.find_folder(laboratory, r_dirname, password)
return (parent_folder, laboratory, r_basename)