split source code for each command.
This commit is contained in:
@ -1,11 +1,73 @@
|
||||
import re
|
||||
from abc import ABC, abstractmethod
|
||||
from argparse import _SubParsersAction
|
||||
|
||||
from mdrsclient.exceptions import UnexpectedException
|
||||
from mdrsclient.api import FolderApi, LaboratoryApi
|
||||
from mdrsclient.config import ConfigFile
|
||||
from mdrsclient.connection import MDRSConnection
|
||||
from mdrsclient.exceptions import (
|
||||
IllegalArgumentException,
|
||||
MissingConfigurationException,
|
||||
UnauthorizedException,
|
||||
UnexpectedException,
|
||||
)
|
||||
from mdrsclient.models import Folder, Laboratory
|
||||
|
||||
|
||||
class BaseCommand(ABC):
|
||||
@staticmethod
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def register(top_level_subparsers: _SubParsersAction) -> None:
|
||||
def register(cls, parsers: _SubParsersAction) -> None:
|
||||
raise UnexpectedException("Not implemented.")
|
||||
|
||||
def _create_connection(self, remote: str) -> MDRSConnection:
|
||||
config = ConfigFile(remote)
|
||||
if config.url is None:
|
||||
raise MissingConfigurationException(f"Remote host `{remote}` is not found.")
|
||||
return MDRSConnection(config.remote, config.url)
|
||||
|
||||
def _find_laboratory(self, connection: MDRSConnection, name: str) -> Laboratory:
|
||||
if connection.laboratories.empty() or connection.token is not None and connection.token.is_expired:
|
||||
laboratory_api = LaboratoryApi(connection)
|
||||
connection.laboratories = laboratory_api.list()
|
||||
laboratory = connection.laboratories.find_by_name(name)
|
||||
if laboratory is None:
|
||||
raise IllegalArgumentException(f"Laboratory `{name}` not found.")
|
||||
return laboratory
|
||||
|
||||
def _find_folder(self, connection: MDRSConnection, laboratory: Laboratory, path: str) -> Folder:
|
||||
folder_api = FolderApi(connection)
|
||||
folders = folder_api.list(laboratory.id, path)
|
||||
if len(folders) != 1:
|
||||
raise UnexpectedException(f"Folder `{path}` not found.")
|
||||
if folders[0].lock:
|
||||
raise UnauthorizedException(f"Folder `{path}` is locked.")
|
||||
return folder_api.retrieve(folders[0].id)
|
||||
|
||||
def _parse_remote_host(self, path: str) -> str:
|
||||
path_array = path.split(":")
|
||||
remote_host = path_array[0]
|
||||
if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2:
|
||||
raise IllegalArgumentException("Invalid remote host")
|
||||
return remote_host
|
||||
|
||||
def _parse_remote_host_with_path(self, path: str) -> tuple[str, str, str]:
|
||||
path = re.sub(r"//+|/\./+|/\.$", "/", path)
|
||||
if re.search(r"/\.\./|/\.\.$", path) is not None:
|
||||
raise IllegalArgumentException("Path traversal found.")
|
||||
path_array = path.split(":")
|
||||
if len(path_array) != 2:
|
||||
raise IllegalArgumentException("Invalid remote host.")
|
||||
remote_host = path_array[0]
|
||||
folder_array = path_array[1].split("/")
|
||||
is_absolute_path = folder_array[0] == ""
|
||||
if not is_absolute_path:
|
||||
raise IllegalArgumentException("Must be absolute paths.")
|
||||
del folder_array[0]
|
||||
if len(folder_array) == 0:
|
||||
laboratory = ""
|
||||
folder = ""
|
||||
else:
|
||||
laboratory = folder_array.pop(0)
|
||||
folder = "/" + "/".join(folder_array)
|
||||
return (remote_host, laboratory, folder)
|
||||
|
Reference in New Issue
Block a user