b95fc0cd7d
Abstract the configuration storage mechanism to allow using custom configurations, such as in-memory setups, when using the tool as a library. This aligns the configuration architecture with the session cache abstraction. - Define ConfigInterface protocol and InMemoryConfig class - Make CacheFile, InMemoryCache, ConfigFile, and InMemoryConfig explicitly inherit their interfaces - Update MdrsService and MdrsClient to accept customizable config_class and config instances - Add validation to check remote parameter consistency in create_connection - Remove unused imports across command files
240 lines
12 KiB
Python
240 lines
12 KiB
Python
import os
|
|
from typing import Any
|
|
from unicodedata import normalize
|
|
|
|
from mdrsclient.api import DoiApi, FilesApi, FoldersApi, LaboratoriesApi, UsersApi
|
|
from mdrsclient.cache import CacheInterface
|
|
from mdrsclient.config import ConfigInterface
|
|
from mdrsclient.connection import MDRSConnection
|
|
from mdrsclient.exceptions import IllegalArgumentException, MDRSException, UnauthorizedException, UnexpectedException
|
|
from mdrsclient.models import File, Folder, Laboratory, Token, User
|
|
from mdrsclient.models.file import find_file
|
|
from mdrsclient.services import MdrsService
|
|
|
|
|
|
class MdrsClient(MdrsService):
|
|
"""Service layer client for MDRS."""
|
|
|
|
def __init__(self, connection: MDRSConnection, config_class: type[ConfigInterface] | None = None):
|
|
super().__init__(connection, config_class)
|
|
|
|
@classmethod
|
|
def from_remote(
|
|
cls, remote: str, cache: CacheInterface | None = None, config: ConfigInterface | None = None
|
|
) -> "MdrsClient":
|
|
return cls(cls.create_connection(remote, cache, config))
|
|
|
|
def mkdir(self, remote_path: str) -> None:
|
|
remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path)
|
|
r_path = r_path.rstrip("/")
|
|
r_dirname = os.path.dirname(r_path)
|
|
r_basename = os.path.basename(r_path)
|
|
laboratory = self.find_laboratory(laboratory_name)
|
|
parent_folder = self.find_folder(laboratory, r_dirname)
|
|
files = self.find_files(parent_folder.id)
|
|
if parent_folder.find_sub_folder(r_basename) is not None or find_file(files, r_basename) is not None:
|
|
raise IllegalArgumentException(f"Cannot create folder `{r_path}`: File exists.")
|
|
folder_api = FoldersApi(self.connection)
|
|
folder_api.create(normalize("NFC", r_basename), parent_folder.id)
|
|
|
|
def rm(self, remote_path: str, is_recursive: bool = False) -> None:
|
|
remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path)
|
|
r_path = r_path.rstrip("/")
|
|
r_dirname = os.path.dirname(r_path)
|
|
r_basename = os.path.basename(r_path)
|
|
laboratory = self.find_laboratory(laboratory_name)
|
|
parent_folder = self.find_folder(laboratory, r_dirname)
|
|
parent_files = self.find_files(parent_folder.id)
|
|
file = find_file(parent_files, r_basename)
|
|
if file is not None:
|
|
file_api = FilesApi(self.connection)
|
|
file_api.destroy(file)
|
|
else:
|
|
folder = parent_folder.find_sub_folder(r_basename)
|
|
if folder is None:
|
|
raise IllegalArgumentException(f"Cannot remove `{r_path}`: No such file or folder.")
|
|
if not is_recursive:
|
|
raise IllegalArgumentException(f"Cannot remove `{r_path}`: Is a folder.")
|
|
folder_api = FoldersApi(self.connection)
|
|
folder_api.destroy(folder.id, True)
|
|
|
|
def ls(self, remote_path: str, password: str | None = None) -> tuple[Folder, list[File]]:
|
|
folder, laboratory = self.resolve_folder(remote_path, password)
|
|
files = self.find_files(folder.id)
|
|
return folder, files
|
|
|
|
def cp(self, src_path: str, dest_path: str, is_recursive: bool = False) -> None:
|
|
s_remote, s_laboratory_name, s_path = self.parse_remote_host_with_path(src_path)
|
|
d_remote, d_laboratory_name, d_path = self.parse_remote_host_with_path(dest_path)
|
|
if s_remote != d_remote:
|
|
raise IllegalArgumentException("Remote host mismatched.")
|
|
if s_laboratory_name != d_laboratory_name:
|
|
raise IllegalArgumentException("Laboratory mismatched.")
|
|
s_path = s_path.rstrip("/")
|
|
s_dirname = os.path.dirname(s_path)
|
|
s_basename = os.path.basename(s_path)
|
|
if d_path.endswith("/"):
|
|
d_dirname = d_path
|
|
d_basename = s_basename
|
|
else:
|
|
d_dirname = os.path.dirname(d_path)
|
|
d_basename = os.path.basename(d_path)
|
|
laboratory = self.find_laboratory(s_laboratory_name)
|
|
s_parent_folder = self.find_folder(laboratory, s_dirname)
|
|
s_parent_files = self.find_files(s_parent_folder.id)
|
|
d_parent_folder = self.find_folder(laboratory, d_dirname)
|
|
d_parent_files = self.find_files(d_parent_folder.id)
|
|
s_file = find_file(s_parent_files, s_basename)
|
|
if s_file is not None:
|
|
d_file = find_file(d_parent_files, d_basename)
|
|
if d_file is not None:
|
|
raise IllegalArgumentException(f"File `{d_basename}` already exists.")
|
|
d_sub_folder = d_parent_folder.find_sub_folder(d_basename)
|
|
if d_sub_folder is not None:
|
|
raise IllegalArgumentException(f"Cannot overwrite non-folder `{d_basename}` with folder `{d_path}`.")
|
|
file_api = FilesApi(self.connection)
|
|
if s_parent_folder.id != d_parent_folder.id or d_basename != s_basename:
|
|
file_api.copy(s_file, d_parent_folder.id, normalize("NFC", d_basename))
|
|
else:
|
|
s_folder = s_parent_folder.find_sub_folder(s_basename)
|
|
if s_folder is None:
|
|
raise IllegalArgumentException(f"File or folder `{s_basename}` not found.")
|
|
if not is_recursive:
|
|
raise IllegalArgumentException(f"Cannot copy `{s_path}`: Is a folder.")
|
|
if find_file(d_parent_files, d_basename) is not None:
|
|
raise IllegalArgumentException(f"Cannot overwrite non-folder `{d_basename}` with folder `{s_path}`.")
|
|
d_folder = d_parent_folder.find_sub_folder(d_basename)
|
|
if d_folder is not None:
|
|
if d_folder.id == s_folder.id:
|
|
raise IllegalArgumentException(f"`{s_path}` and `{s_path}` are the same folder.")
|
|
raise IllegalArgumentException(f"Cannot move `{s_path}` to `{d_path}`: Folder not empty.")
|
|
folder_api = FoldersApi(self.connection)
|
|
if s_parent_folder.id != d_parent_folder.id or s_basename != d_basename:
|
|
folder_api.copy(s_folder, d_parent_folder.id, normalize("NFC", d_basename))
|
|
|
|
def mv(self, src_path: str, dest_path: str) -> None:
|
|
s_remote, s_laboratory_name, s_path = self.parse_remote_host_with_path(src_path)
|
|
d_remote, d_laboratory_name, d_path = self.parse_remote_host_with_path(dest_path)
|
|
if s_remote != d_remote:
|
|
raise IllegalArgumentException("Remote host mismatched.")
|
|
if s_laboratory_name != d_laboratory_name:
|
|
raise IllegalArgumentException("Laboratory mismatched.")
|
|
s_path = s_path.rstrip("/")
|
|
s_dirname = os.path.dirname(s_path)
|
|
s_basename = os.path.basename(s_path)
|
|
if d_path.endswith("/"):
|
|
d_dirname = d_path
|
|
d_basename = s_basename
|
|
else:
|
|
d_dirname = os.path.dirname(d_path)
|
|
d_basename = os.path.basename(d_path)
|
|
laboratory = self.find_laboratory(s_laboratory_name)
|
|
s_parent_folder = self.find_folder(laboratory, s_dirname)
|
|
s_parent_files = self.find_files(s_parent_folder.id)
|
|
d_parent_folder = self.find_folder(laboratory, d_dirname)
|
|
d_parent_files = self.find_files(d_parent_folder.id)
|
|
s_file = find_file(s_parent_files, s_basename)
|
|
if s_file is not None:
|
|
d_file = find_file(d_parent_files, d_basename)
|
|
if d_file is not None:
|
|
raise IllegalArgumentException(f"File `{d_basename}` already exists.")
|
|
d_sub_folder = d_parent_folder.find_sub_folder(d_basename)
|
|
if d_sub_folder is not None:
|
|
raise IllegalArgumentException(f"Cannot overwrite non-folder `{d_basename}` with folder `{d_path}`.")
|
|
file_api = FilesApi(self.connection)
|
|
if s_parent_folder.id != d_parent_folder.id or d_basename != s_basename:
|
|
file_api.move(s_file, d_parent_folder.id, normalize("NFC", d_basename))
|
|
else:
|
|
s_folder = s_parent_folder.find_sub_folder(s_basename)
|
|
if s_folder is None:
|
|
raise IllegalArgumentException(f"File or folder `{s_basename}` not found.")
|
|
if find_file(d_parent_files, d_basename) is not None:
|
|
raise IllegalArgumentException(f"Cannot overwrite non-folder `{d_basename}` with folder `{s_path}`.")
|
|
d_folder = d_parent_folder.find_sub_folder(d_basename)
|
|
if d_folder is not None:
|
|
if d_folder.id == s_folder.id:
|
|
raise IllegalArgumentException(f"`{s_path}` and `{s_path}` are the same folder.")
|
|
raise IllegalArgumentException(f"Cannot move `{s_path}` to `{d_path}`: Folder not empty.")
|
|
folder_api = FoldersApi(self.connection)
|
|
if s_parent_folder.id != d_parent_folder.id or d_basename != s_basename:
|
|
folder_api.move(s_folder, d_parent_folder.id, normalize("NFC", d_basename))
|
|
|
|
def chacl(
|
|
self, remote_path: str, access_level: int, is_recursive: bool = False, password: str | None = None
|
|
) -> None:
|
|
remote, laboratory_name, r_path = self.parse_remote_host_with_path(remote_path)
|
|
r_path = r_path.rstrip("/")
|
|
laboratory = self.find_laboratory(laboratory_name)
|
|
folder = self.find_folder(laboratory, r_path)
|
|
folder_api = FoldersApi(self.connection)
|
|
folder_api.acl(folder.id, access_level, is_recursive, password)
|
|
|
|
def metadata(self, remote_path: str, password: str | None = None) -> dict:
|
|
folder, laboratory = self.resolve_folder(remote_path, password)
|
|
folder_api = FoldersApi(self.connection)
|
|
return folder_api.metadata(folder.id)
|
|
|
|
def file_metadata(self, remote_path: str, password: str | None = None) -> dict:
|
|
folder, laboratory, r_basename = self.resolve_file(remote_path, password)
|
|
files = self.find_files(folder.id)
|
|
file = find_file(files, r_basename)
|
|
if file is None:
|
|
raise IllegalArgumentException(f"File `{r_basename}` not found.")
|
|
file_api = FilesApi(self.connection)
|
|
return file_api.metadata(file)
|
|
|
|
def upload(
|
|
self, local_path: str, remote_path: str, is_recursive: bool = False, is_skip_if_exists: bool = False
|
|
) -> None:
|
|
from mdrsclient.transfer import Uploader
|
|
|
|
uploader = Uploader(self)
|
|
uploader.upload(local_path, remote_path, is_recursive, is_skip_if_exists)
|
|
|
|
def download(
|
|
self,
|
|
remote_path: str,
|
|
local_path: str,
|
|
is_recursive: bool = False,
|
|
is_skip_if_exists: bool = False,
|
|
password: str | None = None,
|
|
excludes: list[str] | None = None,
|
|
) -> None:
|
|
from mdrsclient.transfer import Downloader
|
|
|
|
downloader = Downloader(self)
|
|
downloader.download(remote_path, local_path, is_recursive, is_skip_if_exists, password, excludes)
|
|
|
|
def version(self) -> str:
|
|
from mdrsclient.__version__ import __version__
|
|
|
|
return f"mdrs {__version__}"
|
|
|
|
def config_create(self, remote: str, url: str) -> None:
|
|
remote = self.parse_remote_host(remote)
|
|
config = self.config_class(remote)
|
|
if config.url is not None:
|
|
raise IllegalArgumentException(f"Remote host `{remote}` is already exists.")
|
|
else:
|
|
config.url = url
|
|
|
|
def config_update(self, remote: str, url: str) -> None:
|
|
remote = self.parse_remote_host(remote)
|
|
config = self.config_class(remote)
|
|
if config.url is None:
|
|
raise IllegalArgumentException(f"Remote host `{remote}` is not exists.")
|
|
else:
|
|
config.url = url
|
|
|
|
def config_list(self) -> list:
|
|
config = self.config_class("")
|
|
return config.list()
|
|
|
|
def config_delete(self, remote: str) -> None:
|
|
remote = self.parse_remote_host(remote)
|
|
config = self.config_class(remote)
|
|
if config.url is None:
|
|
raise IllegalArgumentException(f"Remote host `{remote}` is not exists.")
|
|
else:
|
|
del config.url
|