From 08d8a0626a1444d6bec55fc54465d996f3c60c7e Mon Sep 17 00:00:00 2001 From: Yoshihiro OKUMURA Date: Wed, 19 Jul 2023 21:47:47 +0900 Subject: [PATCH] fixed type errors when pylance type checking mode is strict. --- .vscode/settings.json | 2 +- mdrsclient/api/file.py | 10 +-- mdrsclient/api/folder.py | 18 ++--- mdrsclient/api/user.py | 4 +- mdrsclient/commands/base.py | 19 +++-- mdrsclient/commands/chacl.py | 32 +++++---- mdrsclient/commands/config.py | 69 ++++++++++++------ mdrsclient/commands/download.py | 50 ++++++++----- mdrsclient/commands/file_metadata.py | 25 ++++--- mdrsclient/commands/labs.py | 20 ++++-- mdrsclient/commands/login.py | 22 +++--- mdrsclient/commands/logout.py | 18 +++-- mdrsclient/commands/ls.py | 102 +++++++++++++++------------ mdrsclient/commands/metadata.py | 25 ++++--- mdrsclient/commands/mkdir.py | 24 ++++--- mdrsclient/commands/mv.py | 29 +++++--- mdrsclient/commands/rm.py | 27 ++++--- mdrsclient/commands/upload.py | 46 +++++++----- mdrsclient/commands/whoami.py | 21 +++--- mdrsclient/config.py | 2 +- mdrsclient/connection.py | 48 +++++++++---- mdrsclient/models/file.py | 4 +- mdrsclient/models/folder.py | 4 +- mdrsclient/models/user.py | 2 +- 24 files changed, 387 insertions(+), 236 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 1346c37..9f23adb 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -22,7 +22,7 @@ "prettier.singleQuote": true, "prettier.tabWidth": 4, // Extensions - Pylance - "python.analysis.typeCheckingMode": "basic", + "python.analysis.typeCheckingMode": "strict", "python.analysis.exclude": ["api/migrations/[0-9]*.py"], // Extensions - Python:black "python.formatting.blackArgs": ["--line-length=120"], diff --git a/mdrsclient/api/file.py b/mdrsclient/api/file.py index 26b8254..6ce3e97 100644 --- a/mdrsclient/api/file.py +++ b/mdrsclient/api/file.py @@ -1,4 +1,4 @@ -from typing import Final +from typing import Any, Final from pydantic import TypeAdapter from pydantic.dataclasses import dataclass @@ -29,7 +29,7 @@ class FileApi(BaseApi): # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT token_check(self.connection) - data = {"folder_id": folder_id} + data: dict[str, str | int] = {"folder_id": folder_id} try: with open(path, mode="rb") as fp: response = self.connection.post(url, data=data, files={"file": fp}) @@ -52,7 +52,7 @@ class FileApi(BaseApi): raise UnexpectedException(f"Could not open `{path}` file.") else: # update metadata - data = {"name": file.name, "description": file.description} + data: dict[str, str | int] = {"name": file.name, "description": file.description} response = self.connection.put(url, data=data) self._raise_response_error(response) return True @@ -68,13 +68,13 @@ class FileApi(BaseApi): def move(self, file: File, folder_id: str) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + file.id + "/move/" - data = {"folder": folder_id, "name": file.name} + data: dict[str, str | int] = {"folder": folder_id, "name": file.name} token_check(self.connection) response = self.connection.post(url, data=data) self._raise_response_error(response) return True - def metadata(self, file: File) -> dict: + def metadata(self, file: File) -> dict[str, Any]: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + file.id + "/metadata/" token_check(self.connection) diff --git a/mdrsclient/api/folder.py b/mdrsclient/api/folder.py index 85215a9..fa407b9 100644 --- a/mdrsclient/api/folder.py +++ b/mdrsclient/api/folder.py @@ -1,4 +1,4 @@ -from typing import Final +from typing import Any, Final import requests from pydantic import TypeAdapter @@ -21,7 +21,7 @@ class FolderApi(BaseApi): def list(self, laboratory_id: int, path: str) -> list[FolderSimple]: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT - params = {"path": path, "laboratory_id": laboratory_id} + params: dict[str, str | int] = {"path": path, "laboratory_id": laboratory_id} token_check(self.connection) response = self.connection.get(url, params=params) self._raise_response_error(response) @@ -42,7 +42,7 @@ class FolderApi(BaseApi): def create(self, name: str, parent_id: str) -> str: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT - data = {"name": name, "parent_id": parent_id, "description": "", "template_id": -1} + data: dict[str, str | int] = {"name": name, "parent_id": parent_id, "description": "", "template_id": -1} token_check(self.connection) response = self.connection.post(url, data=data) self._raise_response_error(response) @@ -52,7 +52,7 @@ class FolderApi(BaseApi): def update(self, folder: FolderSimple) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + folder.id + "/" - data = { + data: dict[str, str | int] = { "name": folder.name, "description": folder.description, } @@ -64,7 +64,7 @@ class FolderApi(BaseApi): def destroy(self, id: str, recursive: bool) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + id + "/" - params = {"recursive": recursive} + params: dict[str, str | int] = {"recursive": str(recursive)} token_check(self.connection) response = self.connection.delete(url, params=params) self._raise_response_error(response) @@ -73,7 +73,7 @@ class FolderApi(BaseApi): def auth(self, id: str, password: str) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + id + "/auth/" - data = {"password": password} + data: dict[str, str | int] = {"password": password} token_check(self.connection) response = self.connection.post(url, data=data) if response.status_code == requests.codes.unauthorized: @@ -84,7 +84,7 @@ class FolderApi(BaseApi): def acl(self, id: str, access_level: int, recursive: bool, password: str | None) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + id + "/acl/" - data: dict[str, int | str] = {"access_level": access_level} + data: dict[str, str | int] = {"access_level": access_level} if password is not None: data.update({"password": password}) if recursive is True: @@ -97,13 +97,13 @@ class FolderApi(BaseApi): def move(self, folder: FolderSimple, folder_id: str) -> bool: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + folder.id + "/move/" - data = {"parent": folder_id} + data: dict[str, str | int] = {"parent": folder_id} token_check(self.connection) response = self.connection.post(url, data=data) self._raise_response_error(response) return True - def metadata(self, id: str) -> dict: + def metadata(self, id: str) -> dict[str, Any]: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + id + "/metadata/" token_check(self.connection) diff --git a/mdrsclient/api/user.py b/mdrsclient/api/user.py index d07224a..fa17ed2 100644 --- a/mdrsclient/api/user.py +++ b/mdrsclient/api/user.py @@ -22,7 +22,7 @@ class UserApi(BaseApi): def auth(self, username: str, password: str) -> tuple[User, Token]: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + "auth/" - data = {"username": username, "password": password} + data: dict[str, str | int] = {"username": username, "password": password} response = self.connection.post(url, data=data) if response.status_code == requests.codes.unauthorized: raise UnauthorizedException("Invalid username or password.") @@ -37,7 +37,7 @@ class UserApi(BaseApi): def refresh(self, token: Token) -> Token: # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) url = self.ENTRYPOINT + "refresh/" - data = {"refresh": token.refresh} + data: dict[str, str | int] = {"refresh": token.refresh} response = self.connection.post(url, data=data) if response.status_code == requests.codes.unauthorized: raise UnauthorizedException("Token is invalid or expired.") diff --git a/mdrsclient/commands/base.py b/mdrsclient/commands/base.py index 74f5278..1f93ce3 100644 --- a/mdrsclient/commands/base.py +++ b/mdrsclient/commands/base.py @@ -1,6 +1,6 @@ import re from abc import ABC, abstractmethod -from argparse import _SubParsersAction +from typing import Any from unicodedata import normalize from mdrsclient.api import FolderApi, LaboratoryApi @@ -18,16 +18,18 @@ from mdrsclient.models import Folder, Laboratory class BaseCommand(ABC): @classmethod @abstractmethod - def register(cls, parsers: _SubParsersAction) -> None: + def register(cls, parsers: Any) -> None: raise UnexpectedException("Not implemented.") - def _create_connection(self, remote: str) -> MDRSConnection: + @classmethod + def _create_connection(cls, remote: str) -> MDRSConnection: config = ConfigFile(remote) if config.url is None: raise MissingConfigurationException(f"Remote host `{remote}` is not found.") return MDRSConnection(config.remote, config.url) - def _find_laboratory(self, connection: MDRSConnection, name: str) -> Laboratory: + @classmethod + def _find_laboratory(cls, connection: MDRSConnection, name: str) -> Laboratory: if connection.laboratories.empty() or connection.token is not None and connection.token.is_expired: laboratory_api = LaboratoryApi(connection) connection.laboratories = laboratory_api.list() @@ -36,8 +38,9 @@ class BaseCommand(ABC): raise IllegalArgumentException(f"Laboratory `{name}` not found.") return laboratory + @classmethod def _find_folder( - self, connection: MDRSConnection, laboratory: Laboratory, path: str, password: str | None = None + cls, connection: MDRSConnection, laboratory: Laboratory, path: str, password: str | None = None ) -> Folder: folder_api = FolderApi(connection) folders = folder_api.list(laboratory.id, normalize("NFC", path)) @@ -49,14 +52,16 @@ class BaseCommand(ABC): folder_api.auth(folders[0].id, password) return folder_api.retrieve(folders[0].id) - def _parse_remote_host(self, path: str) -> str: + @classmethod + def _parse_remote_host(cls, path: str) -> str: path_array = path.split(":") remote_host = path_array[0] if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2: raise IllegalArgumentException("Invalid remote host") return remote_host - def _parse_remote_host_with_path(self, path: str) -> tuple[str, str, str]: + @classmethod + def _parse_remote_host_with_path(cls, path: str) -> tuple[str, str, str]: path = re.sub(r"//+|/\./+|/\.$", "/", path) if re.search(r"/\.\./|/\.\.$", path) is not None: raise IllegalArgumentException("Path traversal found.") diff --git a/mdrsclient/commands/chacl.py b/mdrsclient/commands/chacl.py index 6d9c99f..825f9c0 100644 --- a/mdrsclient/commands/chacl.py +++ b/mdrsclient/commands/chacl.py @@ -1,4 +1,5 @@ -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import FolderApi from mdrsclient.commands.base import BaseCommand @@ -8,25 +9,32 @@ from mdrsclient.models import FolderAccessLevel class ChaclCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: chacl_parser = parsers.add_parser("chacl", help="change the folder access level") chacl_parser.add_argument("access_level", help="access level (private, cbs_open, pw_open)") chacl_parser.add_argument("-r", "--recursive", help="change access levels recursively", action="store_true") chacl_parser.add_argument("-p", "--password", help="password to set when access level is `pw_open`") chacl_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") - chacl_parser.set_defaults(func=command.chacl) + chacl_parser.set_defaults(func=cls.func) - def chacl(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) - r_path = r_path.rstrip("/") - access_level = FolderAccessLevel.key2id(args.access_level) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + access_level = FolderAccessLevel.key2id(str(args.access_level)) if access_level is None: raise IllegalArgumentException( "Invalid `access_level` parameter. must be `private`, `cbs_open` or `pw_open`." ) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - folder = self._find_folder(connection, laboratory, r_path) + password = str(args.password) if args.password else None + is_recursive = bool(args.recursive) + cls.chacl(remote_path, access_level, is_recursive, password) + + @classmethod + def chacl(cls, remote_path: str, access_level: int, is_recursive: bool, password: str | None) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) + r_path = r_path.rstrip("/") + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + folder = cls._find_folder(connection, laboratory, r_path) folder_api = FolderApi(connection) - folder_api.acl(folder.id, access_level, args.recursive, args.password) + folder_api.acl(folder.id, access_level, is_recursive, password) diff --git a/mdrsclient/commands/config.py b/mdrsclient/commands/config.py index 50f148e..e97fd5e 100644 --- a/mdrsclient/commands/config.py +++ b/mdrsclient/commands/config.py @@ -1,4 +1,5 @@ -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any, Callable from mdrsclient.commands.base import BaseCommand from mdrsclient.config import ConfigFile @@ -7,58 +8,84 @@ from mdrsclient.exceptions import IllegalArgumentException class ConfigCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: # config config_parser = parsers.add_parser("config", help="configure remote hosts") - config_parser.set_defaults(func=lambda x: config_parser.print_help()) + func_help: Callable[[Namespace], None] = lambda _: config_parser.print_help() + config_parser.set_defaults(func=func_help) config_parsers = config_parser.add_subparsers(title="config subcommands") # config create create_parser = config_parsers.add_parser("create", help="create a new remote host") create_parser.add_argument("remote", help="label of remote host") create_parser.add_argument("url", help="API entrypoint url of remote host") - create_parser.set_defaults(func=command.create) + create_parser.set_defaults(func=cls.func_create) # config update update_parser = config_parsers.add_parser("update", help="update a new remote host") update_parser.add_argument("remote", help="label of remote host") update_parser.add_argument("url", help="API entrypoint url of remote host") - update_parser.set_defaults(func=command.update) + update_parser.set_defaults(func=cls.func_update) # config list list_parser = config_parsers.add_parser("list", help="list all the remote hosts") list_parser.add_argument("-l", "--long", help="show the api url", action="store_true") - list_parser.set_defaults(func=command.list) + list_parser.set_defaults(func=cls.func_list) # config delete delete_parser = config_parsers.add_parser("delete", help="delete an existing remote host") delete_parser.add_argument("remote", help="label of remote host") - delete_parser.set_defaults(func=command.delete) + delete_parser.set_defaults(func=cls.func_delete) - def create(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) - config = ConfigFile(remote=remote) + @classmethod + def func_create(cls, args: Namespace) -> None: + remote = str(args.remote) + url = str(args.url) + cls.create(remote, url) + + @classmethod + def func_update(cls, args: Namespace) -> None: + remote = str(args.remote) + url = str(args.url) + cls.update(remote, url) + + @classmethod + def func_list(cls, args: Namespace) -> None: + is_long = bool(args.long) + cls.list(is_long) + + @classmethod + def func_delete(cls, args: Namespace) -> None: + remote = str(args.remote) + cls.delete(remote) + + @classmethod + def create(cls, remote: str, url: str) -> None: + remote = cls._parse_remote_host(remote) + config = ConfigFile(remote) if config.url is not None: raise IllegalArgumentException(f"Remote host `{remote}` is already exists.") else: - config.url = args.url + config.url = url - def update(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) - config = ConfigFile(remote=remote) + @classmethod + def update(cls, remote: str, url: str) -> None: + remote = cls._parse_remote_host(remote) + config = ConfigFile(remote) if config.url is None: raise IllegalArgumentException(f"Remote host `{remote}` is not exists.") else: - config.url = args.url + config.url = url - def list(self, args: Namespace) -> None: + @classmethod + def list(cls, is_long: bool) -> None: config = ConfigFile("") for remote, url in config.list(): line = f"{remote}:" - if args.long: + if is_long: line += f"\t{url}" print(line) - def delete(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) - config = ConfigFile(remote=remote) + @classmethod + def delete(cls, remote: str) -> None: + remote = cls._parse_remote_host(remote) + config = ConfigFile(remote) if config.url is None: raise IllegalArgumentException(f"Remote host `{remote}` is not exists.") else: diff --git a/mdrsclient/commands/download.py b/mdrsclient/commands/download.py index f800b2a..35276cd 100644 --- a/mdrsclient/commands/download.py +++ b/mdrsclient/commands/download.py @@ -1,6 +1,7 @@ import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace from concurrent.futures import ThreadPoolExecutor +from typing import Any from pydantic.dataclasses import dataclass @@ -20,8 +21,7 @@ class DownloadFileInfo: class DownloadCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: download_parser = parsers.add_parser("download", help="download the file or folder") download_parser.add_argument( "-r", "--recursive", help="download folders and their contents recursive", action="store_true" @@ -29,19 +29,28 @@ class DownloadCommand(BaseCommand): download_parser.add_argument("-p", "--password", help="password to use when open locked folder") download_parser.add_argument("remote_path", help="remote file path (remote:/lab/path/file)") download_parser.add_argument("local_path", help="local folder path (/foo/bar/)") - download_parser.set_defaults(func=command.download) + download_parser.set_defaults(func=cls.func) - def download(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + local_path = str(args.local_path) + is_recursive = bool(args.recursive) + password = str(args.password) if args.password else None + cls.download(remote_path, local_path, is_recursive, password) + + @classmethod + def download(cls, remote_path: str, local_path: str, is_recursive: bool, password: str | None) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) - connection = self._create_connection(remote) - l_dirname = os.path.realpath(args.local_path) + connection = cls._create_connection(remote) + l_dirname = os.path.realpath(local_path) if not os.path.isdir(l_dirname): - raise IllegalArgumentException(f"Local directory `{args.local_path}` not found.") - laboratory = self._find_laboratory(connection, laboratory_name) - r_parent_folder = self._find_folder(connection, laboratory, r_dirname, args.password) + raise IllegalArgumentException(f"Local directory `{local_path}` not found.") + laboratory = cls._find_laboratory(connection, laboratory_name) + r_parent_folder = cls._find_folder(connection, laboratory, r_dirname, password) file = r_parent_folder.find_file(r_basename) download_files: list[DownloadFileInfo] = [] if file is not None: @@ -51,14 +60,15 @@ class DownloadCommand(BaseCommand): folder = r_parent_folder.find_sub_folder(r_basename) if folder is None: raise IllegalArgumentException(f"File or folder `{r_path}` not found.") - if not args.recursive: + if not is_recursive: raise IllegalArgumentException(f"Cannot download `{r_path}`: Is a folder.") folder_api = FolderApi(connection) - self.__multiple_download_pickup_recursive_files(folder_api, download_files, folder.id, l_dirname) - self.__multiple_download(connection, download_files) + cls.__multiple_download_pickup_recursive_files(folder_api, download_files, folder.id, l_dirname) + cls.__multiple_download(connection, download_files) + @classmethod def __multiple_download_pickup_recursive_files( - self, folder_api: FolderApi, infolist: list[DownloadFileInfo], folder_id: str, basedir: str + cls, folder_api: FolderApi, infolist: list[DownloadFileInfo], folder_id: str, basedir: str ) -> None: folder = folder_api.retrieve(folder_id) dirname = os.path.join(basedir, folder.name) @@ -69,13 +79,15 @@ class DownloadCommand(BaseCommand): path = os.path.join(dirname, file.name) infolist.append(DownloadFileInfo(file, path)) for sub_folder in folder.sub_folders: - self.__multiple_download_pickup_recursive_files(folder_api, infolist, sub_folder.id, dirname) + cls.__multiple_download_pickup_recursive_files(folder_api, infolist, sub_folder.id, dirname) - def __multiple_download(self, connection: MDRSConnection, infolist: list[DownloadFileInfo]) -> None: + @classmethod + def __multiple_download(cls, connection: MDRSConnection, infolist: list[DownloadFileInfo]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=CONCURRENT) as pool: - pool.map(lambda x: self.__multiple_download_worker(file_api, x), infolist) + pool.map(lambda x: cls.__multiple_download_worker(file_api, x), infolist) - def __multiple_download_worker(self, file_api: FileApi, info: DownloadFileInfo) -> None: + @classmethod + def __multiple_download_worker(cls, file_api: FileApi, info: DownloadFileInfo) -> None: file_api.download(info.file, info.path) print(info.path) diff --git a/mdrsclient/commands/file_metadata.py b/mdrsclient/commands/file_metadata.py index 2bc7120..c279b9a 100644 --- a/mdrsclient/commands/file_metadata.py +++ b/mdrsclient/commands/file_metadata.py @@ -1,6 +1,7 @@ import json import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import FileApi from mdrsclient.commands.base import BaseCommand @@ -9,21 +10,27 @@ from mdrsclient.exceptions import IllegalArgumentException class FileMetadataCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: file_metadata_parser = parsers.add_parser("file-metadata", help="get the file metadata") file_metadata_parser.add_argument("-p", "--password", help="password to use when open locked folder") file_metadata_parser.add_argument("remote_path", help="remote file path (remote:/lab/path/file)") - file_metadata_parser.set_defaults(func=command.file_metadata) + file_metadata_parser.set_defaults(func=cls.func) - def file_metadata(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + password = str(args.password) if args.password else None + cls.file_metadata(remote_path, password) + + @classmethod + def file_metadata(cls, remote_path: str, password: str | None) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - folder = self._find_folder(connection, laboratory, r_dirname, args.password) + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + folder = cls._find_folder(connection, laboratory, r_dirname, password) file = folder.find_file(r_basename) if file is None: raise IllegalArgumentException(f"File `{r_basename}` not found.") diff --git a/mdrsclient/commands/labs.py b/mdrsclient/commands/labs.py index 87a0b9d..7bbbf03 100644 --- a/mdrsclient/commands/labs.py +++ b/mdrsclient/commands/labs.py @@ -1,4 +1,5 @@ -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import LaboratoryApi from mdrsclient.commands.base import BaseCommand @@ -6,15 +7,20 @@ from mdrsclient.commands.base import BaseCommand class LabsCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: labs_parser = parsers.add_parser("labs", help="list all laboratories") labs_parser.add_argument("remote", help="label of remote host") - labs_parser.set_defaults(func=command.labs) + labs_parser.set_defaults(func=cls.func) - def labs(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) - connection = self._create_connection(remote) + @classmethod + def func(cls, args: Namespace) -> None: + remote = str(args.remote) + cls.labs(remote) + + @classmethod + def labs(cls, remote: str) -> None: + remote = cls._parse_remote_host(remote) + connection = cls._create_connection(remote) laboratory_api = LaboratoryApi(connection) laboratories = laboratory_api.list() connection.laboratories = laboratories diff --git a/mdrsclient/commands/login.py b/mdrsclient/commands/login.py index 3e5a832..c399f02 100644 --- a/mdrsclient/commands/login.py +++ b/mdrsclient/commands/login.py @@ -1,5 +1,6 @@ import getpass -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import UserApi from mdrsclient.commands.base import BaseCommand @@ -10,20 +11,25 @@ from mdrsclient.exceptions import MissingConfigurationException class LoginCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: login_parser = parsers.add_parser("login", help="login to remote host") login_parser.add_argument("remote", help="label of remote host") - login_parser.set_defaults(func=command.login) + login_parser.set_defaults(func=cls.func) - def login(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) + @classmethod + def func(cls, args: Namespace) -> None: + remote = str(args.remote) + username = input("Username: ").strip() + password = getpass.getpass("Password: ").strip() + cls.login(remote, username, password) + + @classmethod + def login(cls, remote: str, username: str, password: str) -> None: + remote = cls._parse_remote_host(remote) config = ConfigFile(remote) if config.url is None: raise MissingConfigurationException(f"Remote host `{remote}` is not found.") connection = MDRSConnection(config.remote, config.url) - username = input("Username: ").strip() - password = getpass.getpass("Password: ").strip() user_api = UserApi(connection) (user, token) = user_api.auth(username, password) print("Login Successful") diff --git a/mdrsclient/commands/logout.py b/mdrsclient/commands/logout.py index 4bc9a7a..84e3585 100644 --- a/mdrsclient/commands/logout.py +++ b/mdrsclient/commands/logout.py @@ -1,4 +1,5 @@ -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.commands.base import BaseCommand from mdrsclient.config import ConfigFile @@ -8,14 +9,19 @@ from mdrsclient.exceptions import MissingConfigurationException class LogoutCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: logout_parser = parsers.add_parser("logout", help="logout from remote host") logout_parser.add_argument("remote", help="label of remote host") - logout_parser.set_defaults(func=command.logout) + logout_parser.set_defaults(func=cls.func) - def logout(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) + @classmethod + def func(cls, args: Namespace) -> None: + remote = str(args.remote) + cls.logout(remote) + + @classmethod + def logout(cls, remote: str) -> None: + remote = cls._parse_remote_host(remote) config = ConfigFile(remote) if config.url is None: raise MissingConfigurationException(f"Remote host `{remote}` is not found.") diff --git a/mdrsclient/commands/ls.py b/mdrsclient/commands/ls.py index e653f4a..87e3b07 100644 --- a/mdrsclient/commands/ls.py +++ b/mdrsclient/commands/ls.py @@ -1,5 +1,6 @@ import json -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from pydantic.dataclasses import dataclass @@ -28,8 +29,7 @@ class LsCommandContext: class LsCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: ls_parser = parsers.add_parser("ls", help="list the folder contents") ls_parser.add_argument("-p", "--password", help="password to use when open locked folder") ls_parser.add_argument("-J", "--json", help="turn on json output", action="store_true") @@ -41,35 +41,43 @@ class LsCommand(BaseCommand): ) ls_parser.add_argument("-r", "--recursive", help="list the folder contents recursive", action="store_true") ls_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") - ls_parser.set_defaults(func=command.ls) + ls_parser.set_defaults(func=cls.func) - def ls(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - password = str(args.password) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + password = str(args.password) if args.password else None is_json = bool(args.json) is_recursive = bool(args.recursive) is_quick = bool(args.quick) if not is_recursive else True - self.context = LsCommandContext( + cls.ls(remote_path, password, is_json, is_recursive, is_quick) + + @classmethod + def ls(cls, remote_path: str, password: str | None, is_json: bool, is_recursive: bool, is_quick: bool) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + context = LsCommandContext( f"{remote}:/{laboratory_name}", connection, laboratory, - password, + password if password is not None else "", is_json, is_quick, is_recursive, ) - folder = self._find_folder(connection, laboratory, r_path, password) - if self.context.is_json: - self._ls_json(folder) + folder = cls._find_folder(connection, laboratory, r_path, password) + if context.is_json: + cls._ls_json(context, folder) else: - self._ls_plain(folder) + cls._ls_plain(context, folder) - def _ls_json(self, folder: Folder) -> None: - print(json.dumps(self._folder2dict(folder), ensure_ascii=False)) + @classmethod + def _ls_json(cls, context: LsCommandContext, folder: Folder) -> None: + print(json.dumps(cls._folder2dict(context, folder), ensure_ascii=False)) - def _ls_plain(self, folder: Folder) -> None: + @classmethod + def _ls_plain(cls, context: LsCommandContext, folder: Folder) -> None: label = { "type": "Type", "acl": "Access", @@ -80,9 +88,9 @@ class LsCommand(BaseCommand): } length: dict[str, int] = {} for key in label.keys(): - length[key] = len(label[key]) if not self.context.is_quick else 0 + length[key] = len(label[key]) if not context.is_quick else 0 for sub_folder in folder.sub_folders: - sub_laboratory = self.context.connection.laboratories.find_by_id(sub_folder.lab_id) + sub_laboratory = context.connection.laboratories.find_by_id(sub_folder.lab_id) sub_laboratory_name = sub_laboratory.name if sub_laboratory is not None else "(invalid)" length["acl"] = max(length["acl"], len(sub_folder.access_level_name)) length["laboratory"] = max(length["laboratory"], len(sub_laboratory_name)) @@ -94,23 +102,23 @@ class LsCommand(BaseCommand): length["date"] = max(length["date"], len(file.updated_at_name)) length["name"] = max(length["name"], len(file.name)) length["acl"] = max(length["acl"], len(folder.access_level_name)) - length["laboratory"] = max(length["laboratory"], len(self.context.laboratory.name)) + length["laboratory"] = max(length["laboratory"], len(context.laboratory.name)) header = ( f"{label['type']:{length['type']}}\t{label['acl']:{length['acl']}}\t" f"{label['laboratory']:{length['laboratory']}}\t{label['size']:{length['size']}}\t" f"{label['date']:{length['date']}}\t{label['name']:{length['name']}}" ) - if self.context.is_recursive: - print(f"{self.context.prefix}{folder.path}:") + if context.is_recursive: + print(f"{context.prefix}{folder.path}:") print(f"total {sum(f.size for f in folder.files)}") - if not self.context.is_quick: + if not context.is_quick: print(header) print("-" * len(header.expandtabs())) for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): - sub_laboratory_name = self._laboratory_name(sub_folder.lab_id) + sub_laboratory_name = cls._laboratory_name(context, sub_folder.lab_id) print( f"{'[d]':{length['type']}}\t{sub_folder.access_level_name:{length['acl']}}\t" f"{sub_laboratory_name:{length['laboratory']}}\t{sub_folder.lock_name:{length['size']}}\t" @@ -119,56 +127,59 @@ class LsCommand(BaseCommand): for file in sorted(folder.files, key=lambda x: x.name): print( f"{'[f]':{length['type']}}\t{folder.access_level_name:{length['acl']}}\t" - f"{self.context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t" + f"{context.laboratory.name:{length['laboratory']}}\t{file.size:{length['size']}}\t" f"{file.updated_at_name:{length['date']}}\t{file.name:{length['name']}}" ) - if self.context.is_recursive: + if context.is_recursive: print("") for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): - folder_api = FolderApi(self.context.connection) + folder_api = FolderApi(context.connection) try: if sub_folder.lock: - folder_api.auth(sub_folder.id, self.context.password) + folder_api.auth(sub_folder.id, context.password) folder = folder_api.retrieve(sub_folder.id) - self._ls_plain(folder) + cls._ls_plain(context, folder) except UnauthorizedException: pass - def _folder2dict(self, folder: Folder | FolderSimple) -> dict: - data = { + @classmethod + def _folder2dict(cls, context: LsCommandContext, folder: Folder | FolderSimple) -> dict[str, Any]: + data: dict[str, Any] = { "id": folder.id, "pid": folder.pid, "name": folder.name, "access_level": folder.access_level_name, "lock": folder.lock, - "laboratory": self._laboratory_name(folder.lab_id), + "laboratory": cls._laboratory_name(context, folder.lab_id), "description": folder.description, "created_at": folder.created_at, "updated_at": folder.updated_at, } if isinstance(folder, Folder): - folder_api = FolderApi(self.context.connection) + folder_api = FolderApi(context.connection) data["metadata"] = folder_api.metadata(folder.id) - if self.context.is_recursive: - data["sub_folders"] = [] + if context.is_recursive: + sub_folders: list[dict[str, Any]] = [] for sub_folder in sorted(folder.sub_folders, key=lambda x: x.name): try: if sub_folder.lock: - folder_api.auth(sub_folder.id, self.context.password) + folder_api.auth(sub_folder.id, context.password) folder2 = folder_api.retrieve(sub_folder.id) - data["sub_folders"].append(self._folder2dict(folder2)) + sub_folders.append(cls._folder2dict(context, folder2)) except UnauthorizedException: pass + data["sub_folders"] = sub_folders else: data["sub_folders"] = list( - map(lambda x: self._folder2dict(x), sorted(folder.sub_folders, key=lambda x: x.name)) + map(lambda x: cls._folder2dict(context, x), sorted(folder.sub_folders, key=lambda x: x.name)) ) - data["files"] = list(map(lambda x: self._file2dict(x), sorted(folder.files, key=lambda x: x.name))) + data["files"] = list(map(lambda x: cls._file2dict(context, x), sorted(folder.files, key=lambda x: x.name))) return data - def _file2dict(self, file: File) -> dict: - data = { + @classmethod + def _file2dict(cls, context: LsCommandContext, file: File) -> dict[str, Any]: + data: dict[str, Any] = { "id": file.id, "name": file.name, "type": file.type, @@ -176,12 +187,13 @@ class LsCommand(BaseCommand): # "thumbnail": file.thumbnail, "description": file.description, "metadata": file.metadata, - "download_url": f"{self.context.connection.url}/v2/{file.download_url}", + "download_url": f"{context.connection.url}/v2/{file.download_url}", "created_at": file.created_at, "updated_at": file.updated_at, } return data - def _laboratory_name(self, laboratory_id: int) -> str: - laboratory = self.context.connection.laboratories.find_by_id(laboratory_id) + @classmethod + def _laboratory_name(cls, context: LsCommandContext, laboratory_id: int) -> str: + laboratory = context.connection.laboratories.find_by_id(laboratory_id) return laboratory.name if laboratory is not None else "(invalid)" diff --git a/mdrsclient/commands/metadata.py b/mdrsclient/commands/metadata.py index b141844..46c9c19 100644 --- a/mdrsclient/commands/metadata.py +++ b/mdrsclient/commands/metadata.py @@ -1,5 +1,6 @@ import json -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import FolderApi from mdrsclient.commands.base import BaseCommand @@ -7,18 +8,24 @@ from mdrsclient.commands.base import BaseCommand class MetadataCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: metadata_parser = parsers.add_parser("metadata", help="get a folder metadata") metadata_parser.add_argument("-p", "--password", help="password to use when open locked folder") metadata_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") - metadata_parser.set_defaults(func=command.metadata) + metadata_parser.set_defaults(func=cls.func) - def metadata(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - folder = self._find_folder(connection, laboratory, r_path, args.password) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + password = str(args.password) if args.password else None + cls.metadata(remote_path, password) + + @classmethod + def metadata(cls, remote_path: str, password: str | None) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + folder = cls._find_folder(connection, laboratory, r_path, password) folder_api = FolderApi(connection) metadata = folder_api.metadata(folder.id) print(json.dumps(metadata, ensure_ascii=False)) diff --git a/mdrsclient/commands/mkdir.py b/mdrsclient/commands/mkdir.py index 6bca9d6..a5f6e84 100644 --- a/mdrsclient/commands/mkdir.py +++ b/mdrsclient/commands/mkdir.py @@ -1,5 +1,6 @@ import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from unicodedata import normalize from mdrsclient.api import FolderApi @@ -9,20 +10,25 @@ from mdrsclient.exceptions import IllegalArgumentException class MkdirCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: mkdir_parser = parsers.add_parser("mkdir", help="create a new folder") mkdir_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") - mkdir_parser.set_defaults(func=command.mkdir) + mkdir_parser.set_defaults(func=cls.func) - def mkdir(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + cls.mkdir(remote_path) + + @classmethod + def mkdir(cls, remote_path: str) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - parent_folder = self._find_folder(connection, laboratory, r_dirname) + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + parent_folder = cls._find_folder(connection, laboratory, r_dirname) if parent_folder.find_sub_folder(r_basename) is not None or parent_folder.find_file(r_basename) is not None: raise IllegalArgumentException(f"Cannot create folder `{r_path}`: File exists.") folder_api = FolderApi(connection) diff --git a/mdrsclient/commands/mv.py b/mdrsclient/commands/mv.py index ff89604..a7305a4 100644 --- a/mdrsclient/commands/mv.py +++ b/mdrsclient/commands/mv.py @@ -1,6 +1,7 @@ import dataclasses import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from unicodedata import normalize from pydantic import TypeAdapter @@ -13,16 +14,22 @@ from mdrsclient.models import File, FolderSimple class MvCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: mv_parser = parsers.add_parser("mv", help="move or rename the file or folder") mv_parser.add_argument("src_path", help="source remote path (remote:/lab/path/src)") mv_parser.add_argument("dest_path", help="destination remote path (remote:/lab/path/dest)") - mv_parser.set_defaults(func=command.mv) + mv_parser.set_defaults(func=cls.func) - def mv(self, args: Namespace) -> None: - (s_remote, s_laboratory_name, s_path) = self._parse_remote_host_with_path(args.src_path) - (d_remote, d_laboratory_name, d_path) = self._parse_remote_host_with_path(args.dest_path) + @classmethod + def func(cls, args: Namespace) -> None: + src_path = str(args.src_path) + dest_path = str(args.dest_path) + cls.mv(src_path, dest_path) + + @classmethod + def mv(cls, src_path: str, dest_path: str) -> None: + (s_remote, s_laboratory_name, s_path) = cls._parse_remote_host_with_path(src_path) + (d_remote, d_laboratory_name, d_path) = cls._parse_remote_host_with_path(dest_path) if s_remote != d_remote: raise IllegalArgumentException("Remote host mismatched.") if s_laboratory_name != d_laboratory_name: @@ -36,10 +43,10 @@ class MvCommand(BaseCommand): else: d_dirname = os.path.dirname(d_path) d_basename = os.path.basename(d_path) - connection = self._create_connection(s_remote) - laboratory = self._find_laboratory(connection, s_laboratory_name) - s_parent_folder = self._find_folder(connection, laboratory, s_dirname) - d_parent_folder = self._find_folder(connection, laboratory, d_dirname) + connection = cls._create_connection(s_remote) + laboratory = cls._find_laboratory(connection, s_laboratory_name) + s_parent_folder = cls._find_folder(connection, laboratory, s_dirname) + d_parent_folder = cls._find_folder(connection, laboratory, d_dirname) s_file = s_parent_folder.find_file(s_basename) if s_file is not None: # source is file diff --git a/mdrsclient/commands/rm.py b/mdrsclient/commands/rm.py index 99b2b70..9c6397f 100644 --- a/mdrsclient/commands/rm.py +++ b/mdrsclient/commands/rm.py @@ -1,5 +1,6 @@ import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace +from typing import Any from mdrsclient.api import FileApi, FolderApi from mdrsclient.commands.base import BaseCommand @@ -8,23 +9,29 @@ from mdrsclient.exceptions import IllegalArgumentException class RmCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: rm_parser = parsers.add_parser("rm", help="remove the file or folder") rm_parser.add_argument( "-r", "--recursive", help="remove folders and their contents recursive", action="store_true" ) rm_parser.add_argument("remote_path", help="remote file path (remote:/lab/path/file)") - rm_parser.set_defaults(func=command.rm) + rm_parser.set_defaults(func=cls.func) - def rm(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) + @classmethod + def func(cls, args: Namespace) -> None: + remote_path = str(args.remote_path) + is_recursive = bool(args.recursive) + cls.rm(remote_path, is_recursive) + + @classmethod + def rm(cls, remote_path: str, is_recursive: bool) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - parent_folder = self._find_folder(connection, laboratory, r_dirname) + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + parent_folder = cls._find_folder(connection, laboratory, r_dirname) file = parent_folder.find_file(r_basename) if file is not None: file_api = FileApi(connection) @@ -33,7 +40,7 @@ class RmCommand(BaseCommand): folder = parent_folder.find_sub_folder(r_basename) if folder is None: raise IllegalArgumentException(f"Cannot remove `{r_path}`: No such file or folder.") - if not args.recursive: + if not is_recursive: raise IllegalArgumentException(f"Cannot remove `{r_path}`: Is a folder.") folder_api = FolderApi(connection) folder_api.destroy(folder.id, True) diff --git a/mdrsclient/commands/upload.py b/mdrsclient/commands/upload.py index b71fdf1..0ff81aa 100644 --- a/mdrsclient/commands/upload.py +++ b/mdrsclient/commands/upload.py @@ -1,6 +1,7 @@ import os -from argparse import Namespace, _SubParsersAction +from argparse import Namespace from concurrent.futures import ThreadPoolExecutor +from typing import Any from pydantic.dataclasses import dataclass @@ -20,28 +21,35 @@ class UploadFileInfo: class UploadCommand(BaseCommand): @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: upload_parser = parsers.add_parser("upload", help="upload the file or directory") upload_parser.add_argument( "-r", "--recursive", help="upload directories and their contents recursive", action="store_true" ) upload_parser.add_argument("local_path", help="local file path (/foo/bar/data.txt)") upload_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") - upload_parser.set_defaults(func=command.upload) + upload_parser.set_defaults(func=cls.func) - def upload(self, args: Namespace) -> None: - (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) - l_path = os.path.realpath(args.local_path) + @classmethod + def func(cls, args: Namespace) -> None: + local_path = str(args.local_path) + remote_path = str(args.remote_path) + is_recursive = bool(args.recursive) + cls.upload(local_path, remote_path, is_recursive) + + @classmethod + def upload(cls, local_path: str, remote_path: str, is_recursive: bool) -> None: + (remote, laboratory_name, r_path) = cls._parse_remote_host_with_path(remote_path) + l_path = os.path.realpath(local_path) if not os.path.exists(l_path): - raise IllegalArgumentException(f"File or directory `{args.local_path}` not found.") - connection = self._create_connection(remote) - laboratory = self._find_laboratory(connection, laboratory_name) - folder = self._find_folder(connection, laboratory, r_path) + raise IllegalArgumentException(f"File or directory `{local_path}` not found.") + connection = cls._create_connection(remote) + laboratory = cls._find_laboratory(connection, laboratory_name) + folder = cls._find_folder(connection, laboratory, r_path) infos: list[UploadFileInfo] = [] if os.path.isdir(l_path): - if not args.recursive: - raise IllegalArgumentException(f"Cannot upload `{args.local_path}`: Is a directory.") + if not is_recursive: + raise IllegalArgumentException(f"Cannot upload `{local_path}`: Is a directory.") folder_api = FolderApi(connection) folder_map: dict[str, Folder] = {} folder_map[r_path] = folder @@ -53,7 +61,7 @@ class UploadCommand(BaseCommand): # prepare destination parent path d_parent_dirname = os.path.dirname(d_dirname) if folder_map.get(d_parent_dirname) is None: - folder_map[d_parent_dirname] = self._find_folder(connection, laboratory, d_parent_dirname) + folder_map[d_parent_dirname] = cls._find_folder(connection, laboratory, d_parent_dirname) # prepare destination path if folder_map.get(d_dirname) is None: d_folder = folder_map[d_parent_dirname].find_sub_folder(d_basename) @@ -70,14 +78,16 @@ class UploadCommand(BaseCommand): infos.append(UploadFileInfo(folder_map[d_dirname], os.path.join(dirpath, filename))) else: infos.append(UploadFileInfo(folder, l_path)) - self.__multiple_upload(connection, infos) + cls.__multiple_upload(connection, infos) - def __multiple_upload(self, connection: MDRSConnection, infos: list[UploadFileInfo]) -> None: + @classmethod + def __multiple_upload(cls, connection: MDRSConnection, infos: list[UploadFileInfo]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=CONCURRENT) as pool: - pool.map(lambda x: self.__multiple_upload_worker(file_api, x), infos) + pool.map(lambda x: cls.__multiple_upload_worker(file_api, x), infos) - def __multiple_upload_worker(self, file_api: FileApi, info: UploadFileInfo) -> None: + @classmethod + def __multiple_upload_worker(cls, file_api: FileApi, info: UploadFileInfo) -> None: basename = os.path.basename(info.path) file = info.folder.find_file(basename) try: diff --git a/mdrsclient/commands/whoami.py b/mdrsclient/commands/whoami.py index 5d53c75..60a2712 100644 --- a/mdrsclient/commands/whoami.py +++ b/mdrsclient/commands/whoami.py @@ -1,5 +1,5 @@ -from argparse import Namespace, _SubParsersAction -from typing import Final +from argparse import Namespace +from typing import Any, Final from mdrsclient.commands.base import BaseCommand from mdrsclient.config import ConfigFile @@ -11,19 +11,24 @@ class WhoamiCommand(BaseCommand): ANONYMOUS_USERNAME: Final[str] = "(Anonymous)" @classmethod - def register(cls, parsers: _SubParsersAction) -> None: - command = cls() + def register(cls, parsers: Any) -> None: whoami_parser = parsers.add_parser("whoami", help="show current user name") whoami_parser.add_argument("remote", help="label of remote host") - whoami_parser.set_defaults(func=command.whoami) + whoami_parser.set_defaults(func=cls.func) - def whoami(self, args: Namespace) -> None: - remote = self._parse_remote_host(args.remote) + @classmethod + def func(cls, args: Namespace) -> None: + remote = str(args.remote) + cls.whoami(remote) + + @classmethod + def whoami(cls, remote: str) -> None: + remote = cls._parse_remote_host(remote) config = ConfigFile(remote) if config.url is None: raise MissingConfigurationException(f"Remote host `{remote}` is not found.") connection = MDRSConnection(config.remote, config.url) if connection.token is not None and connection.token.is_expired: connection.logout() - username = connection.user.username if connection.user is not None else self.ANONYMOUS_USERNAME + username = connection.user.username if connection.user is not None else cls.ANONYMOUS_USERNAME print(username) diff --git a/mdrsclient/config.py b/mdrsclient/config.py index 5aa5abe..3768511 100644 --- a/mdrsclient/config.py +++ b/mdrsclient/config.py @@ -2,7 +2,7 @@ import configparser import os from typing import Final -import validators +import validators # type: ignore from mdrsclient.exceptions import IllegalArgumentException from mdrsclient.settings import CONFIG_DIRNAME diff --git a/mdrsclient/connection.py b/mdrsclient/connection.py index 79ece4d..4a125ae 100644 --- a/mdrsclient/connection.py +++ b/mdrsclient/connection.py @@ -1,5 +1,7 @@ import platform import threading +from io import BufferedReader +from typing import TypedDict, Unpack from requests import Response, Session @@ -9,6 +11,27 @@ from mdrsclient.exceptions import MissingConfigurationException from mdrsclient.models import Laboratories, Token, User +class _KwArgsMDRSConnectionGet(TypedDict, total=False): + params: dict[str, str | int] + stream: bool + + +class _KwArgsMDRSConnectionPost(TypedDict, total=False): + params: dict[str, str | int] + data: dict[str, str | int] + files: dict[str, BufferedReader] + + +class _KwArgsMDRSConnectionPut(TypedDict, total=False): + params: dict[str, str | int] + data: dict[str, str | int] + files: dict[str, BufferedReader] + + +class _KwArgsMDRSConnectionDelete(TypedDict, total=False): + params: dict[str, str | int] + + class MDRSConnection: url: str session: Session @@ -23,20 +46,17 @@ class MDRSConnection: self.__cache = CacheFile(remote) self.__prepare_headers() - def get(self, url, *args, **kwargs) -> Response: - return self.session.get(self.__build_url(url), *args, **kwargs) + def get(self, url: str, **kwargs: Unpack[_KwArgsMDRSConnectionGet]) -> Response: + return self.session.get(self.__build_url(url), **kwargs) - def post(self, url, *args, **kwargs) -> Response: - return self.session.post(self.__build_url(url), *args, **kwargs) + def post(self, url: str, **kwargs: Unpack[_KwArgsMDRSConnectionPost]) -> Response: + return self.session.post(self.__build_url(url), **kwargs) - def put(self, url, *args, **kwargs) -> Response: - return self.session.put(self.__build_url(url), *args, **kwargs) + def put(self, url: str, **kwargs: Unpack[_KwArgsMDRSConnectionPut]) -> Response: + return self.session.put(self.__build_url(url), **kwargs) - def delete(self, url, *args, **kwargs) -> Response: - return self.session.delete(self.__build_url(url), *args, **kwargs) - - def patch(self, url, *args, **kwargs) -> Response: - return self.session.patch(self.__build_url(url), *args, **kwargs) + def delete(self, url: str, **kwargs: Unpack[_KwArgsMDRSConnectionDelete]) -> Response: + return self.session.delete(self.__build_url(url), **kwargs) def logout(self) -> None: del self.__cache.user @@ -68,12 +88,10 @@ class MDRSConnection: def laboratories(self, laboratories: Laboratories) -> None: self.__cache.laboratories = laboratories - def __build_url(self, *args: str) -> str: + def __build_url(self, path: str) -> str: if self.url == "": raise MissingConfigurationException("remote host is not configured") - parts = [self.url] - parts.extend(args) - return "/".join(parts) + return f"{self.url}/{path}" def __prepare_headers(self) -> None: self.session.headers.update( diff --git a/mdrsclient/models/file.py b/mdrsclient/models/file.py index 739feaf..b015d11 100644 --- a/mdrsclient/models/file.py +++ b/mdrsclient/models/file.py @@ -1,3 +1,5 @@ +from typing import Any + from pydantic.dataclasses import dataclass from mdrsclient.models.utils import iso8601_to_user_friendly @@ -11,7 +13,7 @@ class File: size: int thumbnail: str | None description: str - metadata: dict + metadata: dict[str, Any] download_url: str created_at: str updated_at: str diff --git a/mdrsclient/models/folder.py b/mdrsclient/models/folder.py index 409680a..4d59c94 100644 --- a/mdrsclient/models/folder.py +++ b/mdrsclient/models/folder.py @@ -1,4 +1,4 @@ -from typing import Final, NamedTuple +from typing import Any, Final, NamedTuple from unicodedata import normalize from pydantic.dataclasses import dataclass @@ -66,7 +66,7 @@ class FolderSimple: @dataclass(frozen=True) class Folder(FolderSimple): - metadata: list[dict] + metadata: list[dict[str, Any]] sub_folders: list[FolderSimple] files: list[File] path: str diff --git a/mdrsclient/models/user.py b/mdrsclient/models/user.py index eff7753..ea9ab52 100644 --- a/mdrsclient/models/user.py +++ b/mdrsclient/models/user.py @@ -38,7 +38,7 @@ class Token: return (now + 10) > access_decoded.exp and (now - 10) < refresh_decoded.exp def __decode(self, token: str) -> DecodedJWT: - data = jwt.decode(token, options={"verify_signature": False}) + data = jwt.decode(token, options={"verify_signature": False}) # type: ignore return TypeAdapter(DecodedJWT).validate_python(data)