import json from argparse import Namespace from typing import Any from pydantic.dataclasses import dataclass from mdrsclient.api import FolderApi from mdrsclient.commands.base import BaseCommand from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import UnauthorizedException from mdrsclient.models import File, Folder, FolderSimple, Laboratory class Config: arbitrary_types_allowed = True frozen = True @dataclass(config=Config) class LsCommandContext: prefix: str connection: MDRSConnection laboratory: Laboratory password: str is_json: bool is_quick: bool is_recursive: bool class LsCommand(BaseCommand): @classmethod def register(cls, parsers: Any) -> None: ls_parser = parsers.add_parser("ls", help="list the folder contents") ls_parser.add_argument("-p", "--password", help="password to use when open locked folder") ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true") ls_parser.add_argument( "-q", "--quick", help="don't output header row. this option is forced if the -r option is specified", action="store_true", ) ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true") ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") ls_parser.set_defaults(func=cls.func) @classmethod def func(cls, args: Namespace) -> None: remote_path = str(args.remote_path) password = str(args.password) if args.password else None is_json = bool(args.json) is_recursive = bool(args.recursive) is_quick = bool(args.quick) if not is_recursive else True cls.ls(remote_path, password, is_json, is_recursive, is_quick) @classmethod def ls(cls, remote_path: str, password: str | None, is_json: bool, is_recursive: bool, is_quick: bool) -> None: (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) connection = cls._create_connection(remote) laboratory = cls._find_laboratory(connection, laboratory_name) context = LsCommandContext( f"{remote}:/{laboratory_name}", connection, laboratory, password if password is not None else "", is_json, is_quick, is_recursive, ) folder = cls._find_folder(connection, laboratory, r_path, password) if context.is_json: cls._ls_json(context, folder) else: cls._ls_plain(context, folder) @classmethod def _ls_json(cls, context: LsCommandContext, folder: Folder) -> None: print(json.dumps(cls._folder2dict(context, folder), ensure_ascii=False)) @classmethod def _ls_plain(cls, context: LsCommandContext, folder: Folder) -> None: label = { "type": "Type", "acl": "Access", "laboratory": "Laboratory", "size": "Lock/Size", "date": "Date", "name": "Name", } length: dict[str, int] = {} for key in label.keys(): length[key] = len(label[key]) if not context.is_quick else 0 for sub_folder in folder.sub_folders: sub_laboratory = context.connection.laboratories.find_by_id(sub_folder.lab_id) sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)" length["acl"] = max(length["acl"], len(sub_folder.access_level_name)) length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name)) length["size"] = max(length["size"], len(sub_folder.lock_name)) length["date"] = max(length["date"], len(sub_folder.updated_at_name)) length["name"] = max(length["name"], len(sub_folder.name)) for file in folder.files: length["size"] = max(length["size"], len(str(file.size))) length["date"] = max(length["date"], len(file.updated_at_name)) length["name"] = max(length["name"], len(file.name)) length["acl"] = max(length["acl"], len(folder.access_level_name)) length["laboratory"] = max(length["laboratory"], len(context.laboratory.name)) header = ( f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t" f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t" f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}" ) if context.is_recursive: print(f"{context.prefix}{folder.path}:") print(f"total {sum(f.size for f in folder.files)}") if not context.is_quick: print(header) print("-" * len(header.expandtabs())) for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): sub_laboratory_name = cls._laboratory_name(context, sub_folder.lab_id) print( f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t" f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t" f"{sub_folder.updated_at_name:{length['date']}}\t{sub_folder.name:{length['name']}}" ) for file in sorted(folder.files, key=lambda x: x.name): print( f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t" f"{context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t" f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}" ) if context.is_recursive: print("") for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): folder_api = FolderApi(context.connection) try: if sub_folder.lock: folder_api.auth(sub_folder.id, context.password) folder = folder_api.retrieve(sub_folder.id) cls._ls_plain(context, folder) except UnauthorizedException: pass @classmethod def _folder2dict(cls, context: LsCommandContext, folder: Folder | FolderSimple) -> dict[str, Any]: data: dict[str, Any] = { "id": folder.id, "pid": folder.pid, "name": folder.name, "access_level": folder.access_level_name, "lock": folder.lock, "laboratory": cls._laboratory_name(context, folder.lab_id), "description": folder.description, "created_at": folder.created_at, "updated_at": folder.updated_at, } if isinstance(folder, Folder): folder_api = FolderApi(context.connection) data["metadata"] = folder_api.metadata(folder.id) if context.is_recursive: sub_folders: list[dict[str, Any]] = [] for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): try: if sub_folder.lock: folder_api.auth(sub_folder.id, context.password) folder2 = folder_api.retrieve(sub_folder.id) sub_folders.append(cls._folder2dict(context, folder2)) except UnauthorizedException: pass data["sub_folders"] = sub_folders else: data["sub_folders"] = list( map(lambda x: cls._folder2dict(context, x), sorted(folder.sub_folders, key=lambda x: x.name)) ) data["files"] = list(map(lambda x: cls._file2dict(context, x), sorted(folder.files, key=lambda x: x.name))) return data @classmethod def _file2dict(cls, context: LsCommandContext, file: File) -> dict[str, Any]: data: dict[str, Any] = { "id": file.id, "name": file.name, "type": file.type, "size": file.size, # "thumbnail": file.thumbnail, "description": file.description, "metadata": file.metadata, "download_url": f"{context.connection.url}/v2/{file.download_url}", "created_at": file.created_at, "updated_at": file.updated_at, } return data @classmethod def _laboratory_name(cls, context: LsCommandContext, laboratory_id: int) -> str: laboratory = context.connection.laboratories.find_by_id(laboratory_id) return laboratory.name if laboratory is not None else "(invalid)"