5bdf837941
Support accessing repositories using DOI strings with optional subpaths
across ls, download, metadata, and file-metadata commands.
- Implement GET v3/doi/{id}/ API model and client calls
- Parse and resolve DOI paths into respective folder and files
- Extract common folder and file resolution logic to shared helpers
- Update README with example DOI-based shell commands
268 lines
11 KiB
Python
268 lines
11 KiB
Python
import os
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any
|
|
from unicodedata import normalize
|
|
|
|
from mdrsclient.api import DoiApi, FilesApi, FoldersApi, LaboratoriesApi
|
|
from mdrsclient.config import ConfigFile
|
|
from mdrsclient.connection import MDRSConnection
|
|
from mdrsclient.exceptions import (
|
|
IllegalArgumentException,
|
|
MissingConfigurationException,
|
|
UnauthorizedException,
|
|
UnexpectedException,
|
|
)
|
|
from mdrsclient.models import File, Folder, Laboratory
|
|
from mdrsclient.utils import page_num_from_url
|
|
|
|
|
|
class BaseCommand(ABC):
|
|
@classmethod
|
|
@abstractmethod
|
|
def register(cls, parsers: Any) -> None:
|
|
raise UnexpectedException("Not implemented.")
|
|
|
|
@classmethod
|
|
def _create_connection(cls, remote: str) -> MDRSConnection:
|
|
config = ConfigFile(remote)
|
|
if config.url is None:
|
|
raise MissingConfigurationException(f"Remote host `{remote}` is not found.")
|
|
return MDRSConnection(config.remote, config.url)
|
|
|
|
@classmethod
|
|
def _find_laboratory(cls, connection: MDRSConnection, name: str) -> Laboratory:
|
|
if connection.laboratories.empty() or connection.token is not None and connection.token.is_expired:
|
|
laboratory_api = LaboratoriesApi(connection)
|
|
connection.laboratories = laboratory_api.list()
|
|
laboratory = connection.laboratories.find_by_name(name)
|
|
if laboratory is None:
|
|
raise IllegalArgumentException(f"Laboratory `{name}` not found.")
|
|
return laboratory
|
|
|
|
@classmethod
|
|
def _find_folder(
|
|
cls, connection: MDRSConnection, laboratory: Laboratory, path: str, password: str | None = None
|
|
) -> Folder:
|
|
folder_api = FoldersApi(connection)
|
|
folders = folder_api.list(laboratory.id, normalize("NFC", path))
|
|
if len(folders) != 1:
|
|
raise UnexpectedException(f"Folder `{path}` not found.")
|
|
if folders[0].lock:
|
|
if password is None:
|
|
raise UnauthorizedException(f"Folder `{path}` is locked.")
|
|
folder_api.auth(folders[0].id, password)
|
|
return folder_api.retrieve(folders[0].id)
|
|
|
|
@classmethod
|
|
def _find_files(cls, connection: MDRSConnection, folder_id: str) -> list[File]:
|
|
files_api = FilesApi(connection)
|
|
page = 1
|
|
results_file = []
|
|
while page:
|
|
result = files_api.list(folder_id, page)
|
|
results_file.extend(result.results)
|
|
page = 0
|
|
if result.next:
|
|
page = page_num_from_url(result.next)
|
|
return results_file
|
|
|
|
@classmethod
|
|
def _parse_remote_host(cls, path: str) -> str:
|
|
path_array = path.split(":")
|
|
remote_host = path_array[0]
|
|
if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2:
|
|
raise IllegalArgumentException("Invalid remote host")
|
|
return remote_host
|
|
|
|
@classmethod
|
|
def _parse_remote_host_with_path(cls, path: str) -> tuple[str, str, str]:
|
|
path = re.sub(r"//+|/\./+|/\.$", "/", path)
|
|
if re.search(r"/\.\./|/\.\.$", path) is not None:
|
|
raise IllegalArgumentException("Path traversal found.")
|
|
path_array = path.split(":")
|
|
if len(path_array) != 2:
|
|
raise IllegalArgumentException("Invalid remote host.")
|
|
remote_host = path_array[0]
|
|
folder_array = path_array[1].split("/")
|
|
is_absolute_path = folder_array[0] == ""
|
|
if not is_absolute_path:
|
|
raise IllegalArgumentException("Must be absolute paths.")
|
|
del folder_array[0]
|
|
if len(folder_array) == 0:
|
|
laboratory = ""
|
|
folder = ""
|
|
else:
|
|
laboratory = folder_array.pop(0)
|
|
folder = "/" + "/".join(folder_array)
|
|
return (remote_host, laboratory, folder)
|
|
|
|
# ------------------------------------------------------------------
|
|
# DOI helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _is_doi(path_component: str) -> bool:
|
|
"""Return True if path_component looks like a DOI string.
|
|
|
|
A DOI is recognised as a string that starts with ``10.`` and
|
|
contains a ``/``.
|
|
"""
|
|
return path_component.startswith("10.") and "/" in path_component
|
|
|
|
@staticmethod
|
|
def _doi_suffix_id(doi: str) -> str:
|
|
"""Extract the internal system ID from a full DOI string.
|
|
|
|
MDRS uses the segment after the last ``.`` in the suffix (the part
|
|
after the ``/``) as its identifier.
|
|
Example: ``10.xxxx/prefix.20230511-001`` → ``20230511-001``.
|
|
If there is no ``.`` in the suffix, the whole suffix is returned.
|
|
Trailing slashes are stripped before processing.
|
|
"""
|
|
# Strip any trailing slash first.
|
|
doi = doi.rstrip("/")
|
|
slash_pos = doi.find("/")
|
|
if slash_pos == -1:
|
|
return doi
|
|
suffix = doi[slash_pos + 1 :]
|
|
dot_pos = suffix.rfind(".")
|
|
return suffix[dot_pos + 1 :] if dot_pos != -1 else suffix
|
|
|
|
@staticmethod
|
|
def _split_doi_and_subpath(doi_with_path: str) -> tuple[str, str]:
|
|
"""Split a DOI-with-optional-path string into (doi, subpath)."""
|
|
# Find the first '/' that separates registrant from suffix.
|
|
first_slash = doi_with_path.find("/")
|
|
if first_slash != -1:
|
|
after_suffix_start = first_slash + 1
|
|
after_first = doi_with_path[after_suffix_start:]
|
|
# Find the next '/' inside the suffix portion — this starts the subpath.
|
|
second_slash = after_first.find("/")
|
|
if second_slash != -1:
|
|
doi_end = after_suffix_start + second_slash
|
|
doi = doi_with_path[:doi_end]
|
|
subpath = doi_with_path[doi_end:] # begins with "/"
|
|
# Treat a bare trailing slash as no subpath (root of DOI folder).
|
|
if subpath == "/":
|
|
return (doi, "")
|
|
else:
|
|
return (doi, subpath)
|
|
else:
|
|
# No second slash — the whole string is the DOI, no subpath.
|
|
return (doi_with_path, "")
|
|
else:
|
|
return (doi_with_path, "")
|
|
|
|
@classmethod
|
|
def _parse_doi_remote_host(cls, path: str) -> tuple[str, str, str]:
|
|
"""Parse ``remote:10.xxxx/prefix.ID[/optional/sub/path]`` into ``(remote, doi, subpath)``."""
|
|
parts = path.split(":", 1)
|
|
if len(parts) != 2:
|
|
raise IllegalArgumentException("remote_path must be in the form 'remote:10.xxxx/prefix.ID'")
|
|
remote, doi_with_path = parts
|
|
if not cls._is_doi(doi_with_path):
|
|
raise IllegalArgumentException(
|
|
f"Path `{doi_with_path}` does not look like a DOI (must start with '10.' and contain '/')."
|
|
)
|
|
doi, subpath = cls._split_doi_and_subpath(doi_with_path)
|
|
return (remote, doi, subpath)
|
|
|
|
@classmethod
|
|
def _find_folder_by_doi(
|
|
cls,
|
|
connection: MDRSConnection,
|
|
doi: str,
|
|
password: str | None = None,
|
|
) -> tuple[Folder, Laboratory]:
|
|
"""Resolve a DOI to a (Folder, Laboratory) pair.
|
|
|
|
Calls GET v3/doi/{id}/ to look up the folder ID, retrieves the full
|
|
folder detail (which carries ``laboratory_id``), and resolves the
|
|
laboratory from that field.
|
|
"""
|
|
doi_clean = doi.rstrip("/")
|
|
doi_id = cls._doi_suffix_id(doi_clean)
|
|
doi_api = DoiApi(connection)
|
|
doi_resp = doi_api.retrieve(doi_id)
|
|
|
|
# Verify the returned DOI matches the one supplied (case-insensitive).
|
|
returned_doi = doi_resp.doi.rstrip("/")
|
|
if returned_doi.lower() != doi_clean.lower():
|
|
raise IllegalArgumentException(
|
|
f"DOI mismatch: requested `{doi_clean}` but server returned `{returned_doi}`."
|
|
)
|
|
|
|
folder_api = FoldersApi(connection)
|
|
|
|
# Retrieve full folder detail directly by ID; laboratory_id is here.
|
|
folder = folder_api.retrieve(doi_resp.folder.id)
|
|
|
|
if folder.lock:
|
|
if password is None:
|
|
raise UnauthorizedException(f"Folder for DOI `{doi_clean}` is locked.")
|
|
folder_api.auth(doi_resp.folder.id, password)
|
|
|
|
# Resolve laboratory using laboratory_id from the full folder detail.
|
|
lab_api = LaboratoriesApi(connection)
|
|
labs = lab_api.list()
|
|
lab = labs.find_by_id(folder.laboratory_id)
|
|
if lab is None:
|
|
raise UnexpectedException(f"Laboratory with id {folder.laboratory_id} not found.")
|
|
|
|
connection.laboratories = labs
|
|
return (folder, lab)
|
|
|
|
@classmethod
|
|
def _resolve_folder(
|
|
cls,
|
|
connection: MDRSConnection,
|
|
remote_path: str,
|
|
password: str | None = None,
|
|
) -> tuple[Folder, Laboratory]:
|
|
"""Resolve any remote path (normal or DOI) into a (Folder, Laboratory) pair."""
|
|
path_component = remote_path.split(":", 1)[1] if ":" in remote_path else ""
|
|
if cls._is_doi(path_component):
|
|
remote, doi, subpath = cls._parse_doi_remote_host(remote_path)
|
|
doi_folder, laboratory = cls._find_folder_by_doi(connection, doi, password)
|
|
if not subpath:
|
|
return (doi_folder, laboratory)
|
|
else:
|
|
abs_path = doi_folder.path.rstrip("/") + subpath
|
|
folder = cls._find_folder(connection, laboratory, abs_path, password)
|
|
return (folder, laboratory)
|
|
else:
|
|
remote, laboratory_name, r_path = cls._parse_remote_host_with_path(remote_path)
|
|
laboratory = cls._find_laboratory(connection, laboratory_name)
|
|
folder = cls._find_folder(connection, laboratory, r_path, password)
|
|
return (folder, laboratory)
|
|
|
|
@classmethod
|
|
def _resolve_file(
|
|
cls,
|
|
connection: MDRSConnection,
|
|
remote_path: str,
|
|
password: str | None = None,
|
|
) -> tuple[Folder, Laboratory, str]:
|
|
"""Resolve a remote path pointing to a file into the parent Folder, its Laboratory, and the file's basename."""
|
|
path_component = remote_path.split(":", 1)[1] if ":" in remote_path else ""
|
|
if cls._is_doi(path_component):
|
|
remote, doi, subpath = cls._parse_doi_remote_host(remote_path)
|
|
doi_folder, laboratory = cls._find_folder_by_doi(connection, doi, password)
|
|
subpath_clean = subpath.rstrip("/")
|
|
if not subpath_clean:
|
|
raise IllegalArgumentException("DOI path must point to a file, not a folder.")
|
|
r_dirname = os.path.dirname(subpath_clean)
|
|
r_basename = os.path.basename(subpath_clean)
|
|
abs_path = doi_folder.path.rstrip("/") + r_dirname
|
|
parent_folder = cls._find_folder(connection, laboratory, abs_path, password)
|
|
return (parent_folder, laboratory, r_basename)
|
|
else:
|
|
remote, laboratory_name, r_path = cls._parse_remote_host_with_path(remote_path)
|
|
r_path = r_path.rstrip("/")
|
|
r_dirname = os.path.dirname(r_path)
|
|
r_basename = os.path.basename(r_path)
|
|
laboratory = cls._find_laboratory(connection, laboratory_name)
|
|
parent_folder = cls._find_folder(connection, laboratory, r_dirname, password)
|
|
return (parent_folder, laboratory, r_basename)
|