mdrs-client-python/mdrsclient/commands/ls.py

200 lines
8.5 KiB
Python

import json
from argparse import Namespace
from typing import Any
from pydantic.dataclasses import dataclass
from mdrsclient.api import FolderApi
from mdrsclient.commands.base import BaseCommand
from mdrsclient.connection import MDRSConnection
from mdrsclient.exceptions import UnauthorizedException
from mdrsclient.models import File, Folder, FolderSimple, Laboratory
class Config:
arbitrary_types_allowed = True
frozen = True
@dataclass(config=Config)
class LsCommandContext:
prefix: str
connection: MDRSConnection
laboratory: Laboratory
password: str
is_json: bool
is_quick: bool
is_recursive: bool
class LsCommand(BaseCommand):
@classmethod
def register(cls, parsers: Any) -> None:
ls_parser = parsers.add_parser("ls", help="list the folder contents")
ls_parser.add_argument("-p", "--password", help="password to use when open locked folder")
ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true")
ls_parser.add_argument(
"-q",
"--quick",
help="don't output header row. this option is forced if the -r option is specified",
action="store_true",
)
ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true")
ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)")
ls_parser.set_defaults(func=cls.func)
@classmethod
def func(cls, args: Namespace) -> None:
remote_path = str(args.remote_path)
password = str(args.password) if args.password else None
is_json = bool(args.json)
is_recursive = bool(args.recursive)
is_quick = bool(args.quick) if not is_recursive else True
cls.ls(remote_path, password, is_json, is_recursive, is_quick)
@classmethod
def ls(cls, remote_path: str, password: str | None, is_json: bool, is_recursive: bool, is_quick: bool) -> None:
(remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path)
connection = cls._create_connection(remote)
laboratory = cls._find_laboratory(connection, laboratory_name)
context = LsCommandContext(
f"{remote}:/{laboratory_name}",
connection,
laboratory,
password if password is not None else "",
is_json,
is_quick,
is_recursive,
)
folder = cls._find_folder(connection, laboratory, r_path, password)
if context.is_json:
cls._ls_json(context, folder)
else:
cls._ls_plain(context, folder)
@classmethod
def _ls_json(cls, context: LsCommandContext, folder: Folder) -> None:
print(json.dumps(cls._folder2dict(context, folder), ensure_ascii=False))
@classmethod
def _ls_plain(cls, context: LsCommandContext, folder: Folder) -> None:
label = {
"type": "Type",
"acl": "Access",
"laboratory": "Laboratory",
"size": "Lock/Size",
"date": "Date",
"name": "Name",
}
length: dict[str, int] = {}
for key in label.keys():
length[key] = len(label[key]) if not context.is_quick else 0
for sub_folder in folder.sub_folders:
sub_laboratory = context.connection.laboratories.find_by_id(sub_folder.lab_id)
sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)"
length["acl"] = max(length["acl"], len(sub_folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name))
length["size"] = max(length["size"], len(sub_folder.lock_name))
length["date"] = max(length["date"], len(sub_folder.updated_at_name))
length["name"] = max(length["name"], len(sub_folder.name))
for file in folder.files:
length["size"] = max(length["size"], len(str(file.size)))
length["date"] = max(length["date"], len(file.updated_at_name))
length["name"] = max(length["name"], len(file.name))
length["acl"] = max(length["acl"], len(folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(context.laboratory.name))
header = (
f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t"
f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t"
f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}"
)
if context.is_recursive:
print(f"{context.prefix}{folder.path}:")
print(f"total {sum(f.size for f in folder.files)}")
if not context.is_quick:
print(header)
print("-" * len(header.expandtabs()))
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
sub_laboratory_name = cls._laboratory_name(context, sub_folder.lab_id)
print(
f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t"
f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t"
f"{sub_folder.updated_at_name:{length['date']}}\t{sub_folder.name:{length['name']}}"
)
for file in sorted(folder.files, key=lambda x: x.name):
print(
f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t"
f"{context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t"
f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}"
)
if context.is_recursive:
print("")
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
folder_api = FolderApi(context.connection)
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, context.password)
folder = folder_api.retrieve(sub_folder.id)
cls._ls_plain(context, folder)
except UnauthorizedException:
pass
@classmethod
def _folder2dict(cls, context: LsCommandContext, folder: Folder | FolderSimple) -> dict[str, Any]:
data: dict[str, Any] = {
"id": folder.id,
"pid": folder.pid,
"name": folder.name,
"access_level": folder.access_level_name,
"lock": folder.lock,
"laboratory": cls._laboratory_name(context, folder.lab_id),
"description": folder.description,
"created_at": folder.created_at,
"updated_at": folder.updated_at,
}
if isinstance(folder, Folder):
folder_api = FolderApi(context.connection)
data["metadata"] = folder_api.metadata(folder.id)
if context.is_recursive:
sub_folders: list[dict[str, Any]] = []
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, context.password)
folder2 = folder_api.retrieve(sub_folder.id)
sub_folders.append(cls._folder2dict(context, folder2))
except UnauthorizedException:
pass
data["sub_folders"] = sub_folders
else:
data["sub_folders"] = list(
map(lambda x: cls._folder2dict(context, x), sorted(folder.sub_folders, key=lambda x: x.name))
)
data["files"] = list(map(lambda x: cls._file2dict(context, x), sorted(folder.files, key=lambda x: x.name)))
return data
@classmethod
def _file2dict(cls, context: LsCommandContext, file: File) -> dict[str, Any]:
data: dict[str, Any] = {
"id": file.id,
"name": file.name,
"type": file.type,
"size": file.size,
# "thumbnail": file.thumbnail,
"description": file.description,
"metadata": file.metadata,
"download_url": f"{context.connection.url}/v2/{file.download_url}",
"created_at": file.created_at,
"updated_at": file.updated_at,
}
return data
@classmethod
def _laboratory_name(cls, context: LsCommandContext, laboratory_id: int) -> str:
laboratory = context.connection.laboratories.find_by_id(laboratory_id)
return laboratory.name if laboratory is not None else "(invalid)"