import json from argparse import Namespace, _SubParsersAction from pydantic.dataclasses import dataclass from mdrsclient.api import FolderApi from mdrsclient.commands.base import BaseCommand from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import UnauthorizedException from mdrsclient.models import File, Folder, FolderSimple, Laboratory class Config: arbitrary_types_allowed = True frozen = True @dataclass(config=Config) class LsCommandContext: prefix: str connection: MDRSConnection laboratory: Laboratory password: str is_json: bool is_quick: bool is_recursive: bool class LsCommand(BaseCommand): @classmethod def register(cls, parsers: _SubParsersAction) -> None: command = cls() ls_parser = parsers.add_parser("ls", help="list the folder contents") ls_parser.add_argument("-p", "--password", help="password to use when open locked folder") ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true") ls_parser.add_argument( "-q", "--quick", help="don't output header row. this option is forced if the -r option is specified", action="store_true", ) ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true") ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") ls_parser.set_defaults(func=command.ls) def ls(self, args: Namespace) -> None: (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) connection = self._create_connection(remote) laboratory = self._find_laboratory(connection, laboratory_name) password = str(args.password) is_json = bool(args.json) is_recursive = bool(args.recursive) is_quick = bool(args.quick) if not is_recursive else True self.context = LsCommandContext( f"{remote}:/{laboratory_name}", connection, laboratory, password, is_json, is_quick, is_recursive, ) folder = self._find_folder(connection, laboratory, r_path, password) if self.context.is_json: self._ls_json(folder) else: self._ls_plain(folder) def _ls_json(self, folder: Folder) -> None: print(json.dumps(self._folder2dict(folder), ensure_ascii=False)) def _ls_plain(self, folder: Folder) -> None: label = { "type": "Type", "acl": "Access", "laboratory": "Laboratory", "size": "Lock/Size", "date": "Date", "name": "Name", } length: dict[str, int] = {} for key in label.keys(): length[key] = len(label[key]) if not self.context.is_quick else 0 for sub_folder in folder.sub_folders: sub_laboratory = self.context.connection.laboratories.find_by_id(sub_folder.lab_id) sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)" length["acl"] = max(length["acl"], len(sub_folder.access_level_name)) length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name)) length["size"] = max(length["size"], len(sub_folder.lock_name)) length["date"] = max(length["date"], len(sub_folder.updated_at_name)) length["name"] = max(length["name"], len(sub_folder.name)) for file in folder.files: length["size"] = max(length["size"], len(str(file.size))) length["date"] = max(length["date"], len(file.updated_at_name)) length["name"] = max(length["name"], len(file.name)) length["acl"] = max(length["acl"], len(folder.access_level_name)) length["laboratory"] = max(length["laboratory"], len(self.context.laboratory.name)) header = ( f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t" f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t" f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}" ) if self.context.is_recursive: print(f"{self.context.prefix}{folder.path}:") print(f"total {sum(f.size for f in folder.files)}") if not self.context.is_quick: print(header) print("-" * len(header.expandtabs())) for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): sub_laboratory_name = self._laboratory_name(sub_folder.lab_id) print( f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t" f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t" f"{sub_folder.updated_at_name:{length['date']}}\t{sub_folder.name:{length['name']}}" ) for file in sorted(folder.files, key=lambda x: x.name): print( f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t" f"{self.context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t" f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}" ) if self.context.is_recursive: print("") for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): folder_api = FolderApi(self.context.connection) try: if sub_folder.lock: folder_api.auth(sub_folder.id, self.context.password) folder = folder_api.retrieve(sub_folder.id) self._ls_plain(folder) except UnauthorizedException: pass def _folder2dict(self, folder: Folder | FolderSimple) -> dict: data = { "id": folder.id, "pid": folder.pid, "name": folder.name, "access_level": folder.access_level_name, "lock": folder.lock, "laboratory": self._laboratory_name(folder.lab_id), "description": folder.description, "created_at": folder.created_at, "updated_at": folder.updated_at, } if isinstance(folder, Folder): folder_api = FolderApi(self.context.connection) data["metadata"] = folder_api.metadata(folder.id) if self.context.is_recursive: data["sub_folders"] = [] for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): try: if sub_folder.lock: folder_api.auth(sub_folder.id, self.context.password) folder2 = folder_api.retrieve(sub_folder.id) data["sub_folders"].append(self._folder2dict(folder2)) except UnauthorizedException: pass else: data["sub_folders"] = ( list(map(lambda x: self._folder2dict(x), sorted(folder.sub_folders, key=lambda x: x.name))), ) data["files"] = (list(map(lambda x: self._file2dict(x), sorted(folder.files, key=lambda x: x.name))),) return data def _file2dict(self, file: File) -> dict: data = { "id": file.id, "name": file.name, "type": file.type, "size": file.size, # "thumbnail": file.thumbnail, "description": file.description, "metadata": file.metadata, "download_url": f"{self.context.connection.url}/v2/{file.download_url}", "created_at": file.created_at, "updated_at": file.updated_at, } return data def _laboratory_name(self, laboratory_id: int) -> str: laboratory = self.context.connection.laboratories.find_by_id(laboratory_id) return laboratory.name if laboratory is not None else "(invalid)"