fixed type errors when pylance type checking mode is strict.

This commit is contained in:
2023-07-19 21:47:47 +09:00
parent 23025bd679
commit 08d8a0626a
24 changed files with 387 additions and 236 deletions

View File

@ -1,5 +1,6 @@
import json
from argparse import Namespace, _SubParsersAction
from argparse import Namespace
from typing import Any
from pydantic.dataclasses import dataclass
@ -28,8 +29,7 @@ class LsCommandContext:
class LsCommand(BaseCommand):
@classmethod
def register(cls, parsers: _SubParsersAction) -> None:
command = cls()
def register(cls, parsers: Any) -> None:
ls_parser = parsers.add_parser("ls", help="list the folder contents")
ls_parser.add_argument("-p", "--password", help="password to use when open locked folder")
ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true")
@ -41,35 +41,43 @@ class LsCommand(BaseCommand):
)
ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true")
ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)")
ls_parser.set_defaults(func=command.ls)
ls_parser.set_defaults(func=cls.func)
def ls(self, args: Namespace) -> None:
(remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path)
connection = self._create_connection(remote)
laboratory = self._find_laboratory(connection, laboratory_name)
password = str(args.password)
@classmethod
def func(cls, args: Namespace) -> None:
remote_path = str(args.remote_path)
password = str(args.password) if args.password else None
is_json = bool(args.json)
is_recursive = bool(args.recursive)
is_quick = bool(args.quick) if not is_recursive else True
self.context = LsCommandContext(
cls.ls(remote_path, password, is_json, is_recursive, is_quick)
@classmethod
def ls(cls, remote_path: str, password: str | None, is_json: bool, is_recursive: bool, is_quick: bool) -> None:
(remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path)
connection = cls._create_connection(remote)
laboratory = cls._find_laboratory(connection, laboratory_name)
context = LsCommandContext(
f"{remote}:/{laboratory_name}",
connection,
laboratory,
password,
password if password is not None else "",
is_json,
is_quick,
is_recursive,
)
folder = self._find_folder(connection, laboratory, r_path, password)
if self.context.is_json:
self._ls_json(folder)
folder = cls._find_folder(connection, laboratory, r_path, password)
if context.is_json:
cls._ls_json(context, folder)
else:
self._ls_plain(folder)
cls._ls_plain(context, folder)
def _ls_json(self, folder: Folder) -> None:
print(json.dumps(self._folder2dict(folder), ensure_ascii=False))
@classmethod
def _ls_json(cls, context: LsCommandContext, folder: Folder) -> None:
print(json.dumps(cls._folder2dict(context, folder), ensure_ascii=False))
def _ls_plain(self, folder: Folder) -> None:
@classmethod
def _ls_plain(cls, context: LsCommandContext, folder: Folder) -> None:
label = {
"type": "Type",
"acl": "Access",
@ -80,9 +88,9 @@ class LsCommand(BaseCommand):
}
length: dict[str, int] = {}
for key in label.keys():
length[key] = len(label[key]) if not self.context.is_quick else 0
length[key] = len(label[key]) if not context.is_quick else 0
for sub_folder in folder.sub_folders:
sub_laboratory = self.context.connection.laboratories.find_by_id(sub_folder.lab_id)
sub_laboratory = context.connection.laboratories.find_by_id(sub_folder.lab_id)
sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)"
length["acl"] = max(length["acl"], len(sub_folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name))
@ -94,23 +102,23 @@ class LsCommand(BaseCommand):
length["date"] = max(length["date"], len(file.updated_at_name))
length["name"] = max(length["name"], len(file.name))
length["acl"] = max(length["acl"], len(folder.access_level_name))
length["laboratory"] = max(length["laboratory"], len(self.context.laboratory.name))
length["laboratory"] = max(length["laboratory"], len(context.laboratory.name))
header = (
f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t"
f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t"
f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}"
)
if self.context.is_recursive:
print(f"{self.context.prefix}{folder.path}:")
if context.is_recursive:
print(f"{context.prefix}{folder.path}:")
print(f"total {sum(f.size for f in folder.files)}")
if not self.context.is_quick:
if not context.is_quick:
print(header)
print("-" * len(header.expandtabs()))
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
sub_laboratory_name = self._laboratory_name(sub_folder.lab_id)
sub_laboratory_name = cls._laboratory_name(context, sub_folder.lab_id)
print(
f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t"
f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t"
@ -119,56 +127,59 @@ class LsCommand(BaseCommand):
for file in sorted(folder.files, key=lambda x: x.name):
print(
f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t"
f"{self.context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t"
f"{context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t"
f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}"
)
if self.context.is_recursive:
if context.is_recursive:
print("")
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
folder_api = FolderApi(self.context.connection)
folder_api = FolderApi(context.connection)
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, self.context.password)
folder_api.auth(sub_folder.id, context.password)
folder = folder_api.retrieve(sub_folder.id)
self._ls_plain(folder)
cls._ls_plain(context, folder)
except UnauthorizedException:
pass
def _folder2dict(self, folder: Folder | FolderSimple) -> dict:
data = {
@classmethod
def _folder2dict(cls, context: LsCommandContext, folder: Folder | FolderSimple) -> dict[str, Any]:
data: dict[str, Any] = {
"id": folder.id,
"pid": folder.pid,
"name": folder.name,
"access_level": folder.access_level_name,
"lock": folder.lock,
"laboratory": self._laboratory_name(folder.lab_id),
"laboratory": cls._laboratory_name(context, folder.lab_id),
"description": folder.description,
"created_at": folder.created_at,
"updated_at": folder.updated_at,
}
if isinstance(folder, Folder):
folder_api = FolderApi(self.context.connection)
folder_api = FolderApi(context.connection)
data["metadata"] = folder_api.metadata(folder.id)
if self.context.is_recursive:
data["sub_folders"] = []
if context.is_recursive:
sub_folders: list[dict[str, Any]] = []
for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name):
try:
if sub_folder.lock:
folder_api.auth(sub_folder.id, self.context.password)
folder_api.auth(sub_folder.id, context.password)
folder2 = folder_api.retrieve(sub_folder.id)
data["sub_folders"].append(self._folder2dict(folder2))
sub_folders.append(cls._folder2dict(context, folder2))
except UnauthorizedException:
pass
data["sub_folders"] = sub_folders
else:
data["sub_folders"] = list(
map(lambda x: self._folder2dict(x), sorted(folder.sub_folders, key=lambda x: x.name))
map(lambda x: cls._folder2dict(context, x), sorted(folder.sub_folders, key=lambda x: x.name))
)
data["files"] = list(map(lambda x: self._file2dict(x), sorted(folder.files, key=lambda x: x.name)))
data["files"] = list(map(lambda x: cls._file2dict(context, x), sorted(folder.files, key=lambda x: x.name)))
return data
def _file2dict(self, file: File) -> dict:
data = {
@classmethod
def _file2dict(cls, context: LsCommandContext, file: File) -> dict[str, Any]:
data: dict[str, Any] = {
"id": file.id,
"name": file.name,
"type": file.type,
@ -176,12 +187,13 @@ class LsCommand(BaseCommand):
# "thumbnail": file.thumbnail,
"description": file.description,
"metadata": file.metadata,
"download_url": f"{self.context.connection.url}/v2/{file.download_url}",
"download_url": f"{context.connection.url}/v2/{file.download_url}",
"created_at": file.created_at,
"updated_at": file.updated_at,
}
return data
def _laboratory_name(self, laboratory_id: int) -> str:
laboratory = self.context.connection.laboratories.find_by_id(laboratory_id)
@classmethod
def _laboratory_name(cls, context: LsCommandContext, laboratory_id: int) -> str:
laboratory = context.connection.laboratories.find_by_id(laboratory_id)
return laboratory.name if laboratory is not None else "(invalid)"