implemented -J json output option for ls command.

This commit is contained in:
Yoshihiro OKUMURA 2023-06-06 22:12:44 +09:00
parent 3982521530
commit a1ef26897f
Signed by: orrisroot
GPG Key ID: 470AA444C92904B2

View File

@ -1,3 +1,4 @@
import json
from argparse import Namespace, _SubParsersAction from argparse import Namespace, _SubParsersAction
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
@ -6,7 +7,7 @@ from mdrsclient.api import FolderApi
from mdrsclient.commands.base import BaseCommand from mdrsclient.commands.base import BaseCommand
from mdrsclient.connection import MDRSConnection from mdrsclient.connection import MDRSConnection
from mdrsclient.exceptions import UnauthorizedException from mdrsclient.exceptions import UnauthorizedException
from mdrsclient.models import Folder, Laboratory from mdrsclient.models import File, Folder, FolderSimple, Laboratory
class Config: class Config:
@ -20,6 +21,7 @@ class LsCommandContext:
connection: MDRSConnection connection: MDRSConnection
laboratory: Laboratory laboratory: Laboratory
password: str password: str
is_json: bool
is_quick: bool is_quick: bool
is_recursive: bool is_recursive: bool
@ -30,13 +32,14 @@ class LsCommand(BaseCommand):
command = cls() command = cls()
ls_parser = parsers.add_parser("ls", help="list the folder contents") ls_parser = parsers.add_parser("ls", help="list the folder contents")
ls_parser.add_argument("-p", "--password", help="password to use when open locked folder") ls_parser.add_argument("-p", "--password", help="password to use when open locked folder")
ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true")
ls_parser.add_argument( ls_parser.add_argument(
"-q", "-q",
"--quick", "--quick",
help="don't output header row. this option is forced if the -r option is specified", help="don't output header row. this option is forced if the -r option is specified",
action="store_true", action="store_true",
) )
ls_parser.add_argument("-r", "--recursive", help="list the folders contents recursive", action="store_true") ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true")
ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)")
ls_parser.set_defaults(func=command.ls) ls_parser.set_defaults(func=command.ls)
@ -45,6 +48,7 @@ class LsCommand(BaseCommand):
connection = self._create_connection(remote) connection = self._create_connection(remote)
laboratory = self._find_laboratory(connection, laboratory_name) laboratory = self._find_laboratory(connection, laboratory_name)
password = str(args.password) password = str(args.password)
is_json = bool(args.json)
is_recursive = bool(args.recursive) is_recursive = bool(args.recursive)
is_quick = bool(args.quick) if not is_recursive else True is_quick = bool(args.quick) if not is_recursive else True
self.context = LsCommandContext( self.context = LsCommandContext(
@ -52,13 +56,20 @@ class LsCommand(BaseCommand):
connection, connection,
laboratory, laboratory,
password, password,
is_json,
is_quick, is_quick,
is_recursive, is_recursive,
) )
folder = self._find_folder(connection, laboratory, r_path, password) folder = self._find_folder(connection, laboratory, r_path, password)
self._ls_body(folder) if self.context.is_json:
self._ls_json(folder)
else:
self._ls_plain(folder)
def _ls_body(self, folder: Folder) -> None: def _ls_json(self, folder: Folder) -> None:
print(json.dumps(self._folder2dict(folder), ensure_ascii=False))
def _ls_plain(self, folder: Folder) -> None:
label = { label = {
"type": "Type", "type": "Type",
"acl": "Access", "acl": "Access",
@ -99,8 +110,7 @@ class LsCommand(BaseCommand):
print("-" * len(header.expandtabs())) print("-" * len(header.expandtabs()))
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
sub_laboratory = self.context.connection.laboratories.find_by_id(sub_folder.lab_id) sub_laboratory_name = self._laboratory_name(sub_folder.lab_id)
sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)"
print( print(
f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t" f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t"
f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t" f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t"
@ -121,6 +131,57 @@ class LsCommand(BaseCommand):
if sub_folder.lock: if sub_folder.lock:
folder_api.auth(sub_folder.id, self.context.password) folder_api.auth(sub_folder.id, self.context.password)
folder = folder_api.retrieve(sub_folder.id) folder = folder_api.retrieve(sub_folder.id)
self._ls_body(folder) self._ls_plain(folder)
except UnauthorizedException: except UnauthorizedException:
pass pass
def _folder2dict(self, folder: Folder | FolderSimple) -> dict:
data = {
"id": folder.id,
"pid": folder.pid,
"name": folder.name,
"access_level": folder.access_level_name,
"lock": folder.lock,
"laboratory": self._laboratory_name(folder.lab_id),
"description": folder.description,
"created_at": folder.created_at,
"updated_at": folder.updated_at,
}
if isinstance(folder, Folder):
folder_api = FolderApi(self.context.connection)
data["metadata"] = folder_api.metadata(folder.id)
if self.context.is_recursive:
data["sub_folders"] = []
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, self.context.password)
folder2 = folder_api.retrieve(sub_folder.id)
data["sub_folders"].append(self._folder2dict(folder2))
except UnauthorizedException:
pass
else:
data["sub_folders"] = (
list(map(lambda x: self._folder2dict(x), sorted(folder.sub_folders, key=lambda x: x.name))),
)
data["files"] = (list(map(lambda x: self._file2dict(x), sorted(folder.files, key=lambda x: x.name))),)
return data
def _file2dict(self, file: File) -> dict:
data = {
"id": file.id,
"name": file.name,
"type": file.type,
"size": file.size,
# "thumbnail": file.thumbnail,
"description": file.description,
"metadata": file.metadata,
"download_url": f"{self.context.connection.url}/v2/{file.download_url}",
"created_at": file.created_at,
"updated_at": file.updated_at,
}
return data
def _laboratory_name(self, laboratory_id: int) -> str:
laboratory = self.context.connection.laboratories.find_by_id(laboratory_id)
return laboratory.name if laboratory is not None else "(invalid)"