Files
orrisroot 8ce9e09e69 refactor: use services layer and modularize transfer operations
Decouple CLI commands from internal helper logic and consolidate the
core file transfer operations in the service layer to improve library
portability.

- Make MdrsClient subclass MdrsService to inherit resource resolution.
- Remove all deprecated helper methods from BaseCommand.
- Move core upload and download logic to a new transfer module.
- Refactor all CLI commands to route actions through MdrsClient.
- Eliminate circular imports between client and CLI command modules.
2026-07-02 23:16:53 +09:00

224 lines
9.3 KiB
Python

import json
from argparse import Namespace
from typing import Any
from pydantic.dataclasses import dataclass
from mdrsclient.api import FilesApi, FoldersApi
from mdrsclient.client import MdrsClient
from mdrsclient.commands.base import BaseCommand
from mdrsclient.exceptions import UnauthorizedException
from mdrsclient.models import File, Folder, FolderSimple, Laboratory
class Config:
arbitrary_types_allowed = True
frozen = True
@dataclass(config=Config)
class LsCommandContext:
prefix: str
client: MdrsClient
laboratory: Laboratory
password: str
is_json: bool
is_quiet: bool
is_recursive: bool
class LsCommand(BaseCommand):
@classmethod
def register(cls, parsers: Any) -> None:
ls_parser = parsers.add_parser("ls", help="list the folder contents")
ls_parser.add_argument("-p", "--password", help="password to use when open locked folder")
ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true")
ls_parser.add_argument(
"-q",
"--quiet",
help="don't output header row. this option is forced if the -r option is specified",
action="store_true",
)
ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true")
ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)")
ls_parser.set_defaults(func=cls.func)
@classmethod
def func(cls, args: Namespace) -> None:
remote_path = str(args.remote_path)
password = str(args.password) if args.password else None
is_json = bool(args.json)
is_recursive = bool(args.recursive)
is_quiet = bool(args.quiet) if not is_recursive else True
cls.ls(remote_path, password, is_json, is_recursive, is_quiet)
@classmethod
def ls(cls, remote_path: str, password: str | None, is_json: bool, is_recursive: bool, is_quiet: bool) -> None:
remote = remote_path.split(":", 1)[0] if ":" in remote_path else ""
from mdrsclient.client import MdrsClient
client = MdrsClient.from_remote(remote)
cls._ls_logic(client, remote_path, password, is_json, is_recursive, is_quiet)
return
@classmethod
def _ls_logic(
cls,
client: MdrsClient,
remote_path: str,
password: str | None,
is_json: bool,
is_recursive: bool,
is_quiet: bool,
) -> None:
remote = remote_path.split(":", 1)[0] if ":" in remote_path else ""
folder, laboratory = client.resolve_folder(remote_path, password)
laboratory_name = laboratory.name
files = client.find_files(folder.id)
context = LsCommandContext(
f"{remote}:/{laboratory_name}",
client,
laboratory,
password if password is not None else "",
is_json,
is_quiet,
is_recursive,
)
if context.is_json:
cls._ls_json(context, folder, files)
else:
cls._ls_plain(context, folder, files)
@classmethod
def _ls_json(cls, context: LsCommandContext, folder: Folder, files: list[File]) -> None:
print(json.dumps(cls._folder2dict(context, folder, files), ensure_ascii=False))
@classmethod
def _ls_plain(cls, context: LsCommandContext, folder: Folder, files: list[File]) -> None:
label = {
"type": "Type",
"acl": "Access",
"laboratory": "Laboratory",
"size": "Size",
"date": "Date",
"name": "Name",
}
length: dict[str, int] = {}
for key in label.keys():
length[key] = len(label[key]) if not context.is_quiet else 0
for sub_folder in folder.sub_folders:
sub_laboratory = context.client.connection.laboratories.find_by_id(sub_folder.laboratory_id)
sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)"
length["acl"] = max(length["acl"], len(sub_folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name))
length["size"] = max(length["size"], len(str(folder.size)))
length["date"] = max(length["date"], len(sub_folder.updated_at_name))
length["name"] = max(length["name"], len(sub_folder.name))
for file in files:
length["size"] = max(length["size"], len(str(file.size)))
length["date"] = max(length["date"], len(file.updated_at_name))
length["name"] = max(length["name"], len(file.name))
length["acl"] = max(length["acl"], len(folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(context.laboratory.name))
header = (
f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t"
f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t"
f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}"
)
if context.is_recursive:
print(f"{context.prefix}{folder.path}:")
print(f"total {sum(f.size for f in files)}")
if not context.is_quiet:
print(header)
print("-" * len(header.expandtabs()))
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
sub_laboratory_name = cls._laboratory_name(context, sub_folder.laboratory_id)
sub_folder_type = "[d]" if sub_folder.lock is False else "[l]"
print(
f"{sub_folder_type:{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t"
f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.size:{length['size']}}\t"
f"{sub_folder.updated_at_name:{length['date']}}\t{sub_folder.name:{length['name']}}"
)
for file in sorted(files, key=lambda x: x.name):
print(
f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t"
f"{context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t"
f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}"
)
if context.is_recursive:
print("")
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
folder_api = FoldersApi(context.client.connection)
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, context.password)
folder = folder_api.retrieve(sub_folder.id)
files = context.client.find_files(sub_folder.id)
cls._ls_plain(context, folder, files)
except UnauthorizedException:
pass
@classmethod
def _folder2dict(
cls, context: LsCommandContext, folder: Folder | FolderSimple, files: list[File]
) -> dict[str, Any]:
data: dict[str, Any] = {
"id": folder.id,
"pid": folder.pid,
"name": folder.name,
"size": folder.size,
"access_level": folder.access_level_name,
"lock": folder.lock,
"laboratory": cls._laboratory_name(context, folder.laboratory_id),
"description": folder.description,
"created_at": folder.created_at,
"updated_at": folder.updated_at,
}
if isinstance(folder, Folder):
folder_api = FoldersApi(context.client.connection)
data["metadata"] = folder_api.metadata(folder.id)
if context.is_recursive:
sub_folders: list[dict[str, Any]] = []
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, context.password)
folder2 = folder_api.retrieve(sub_folder.id)
files2 = context.client.find_files(sub_folder.id)
sub_folders.append(cls._folder2dict(context, folder2, files2))
except UnauthorizedException:
pass
data["sub_folders"] = sub_folders
else:
data["sub_folders"] = list(
map(lambda x: cls._folder2dict(context, x, []), sorted(folder.sub_folders, key=lambda x: x.name))
)
data["files"] = list(map(lambda x: cls._file2dict(context, x), sorted(files, key=lambda x: x.name)))
return data
@classmethod
def _file2dict(cls, context: LsCommandContext, file: File) -> dict[str, Any]:
data: dict[str, Any] = {
"id": file.id,
"name": file.name,
"type": file.type,
"size": file.size,
# "thumbnail": file.thumbnail,
"description": file.description,
"metadata": file.metadata,
"download_url": f"{context.client.connection.url}/{file.download_url}",
"created_at": file.created_at,
"updated_at": file.updated_at,
}
return data
@classmethod
def _laboratory_name(cls, context: LsCommandContext, laboratory_id: int) -> str:
laboratory = context.client.connection.laboratories.find_by_id(laboratory_id)
return laboratory.name if laboratory is not None else "(invalid)"