78 lines
3.1 KiB
Python
78 lines
3.1 KiB
Python
import re
|
|
from abc import ABC, abstractmethod
|
|
from argparse import _SubParsersAction
|
|
|
|
from mdrsclient.api import FolderApi, LaboratoryApi
|
|
from mdrsclient.config import ConfigFile
|
|
from mdrsclient.connection import MDRSConnection
|
|
from mdrsclient.exceptions import (
|
|
IllegalArgumentException,
|
|
MissingConfigurationException,
|
|
UnauthorizedException,
|
|
UnexpectedException,
|
|
)
|
|
from mdrsclient.models import Folder, Laboratory
|
|
|
|
|
|
class BaseCommand(ABC):
|
|
@classmethod
|
|
@abstractmethod
|
|
def register(cls, parsers: _SubParsersAction) -> None:
|
|
raise UnexpectedException("Not implemented.")
|
|
|
|
def _create_connection(self, remote: str) -> MDRSConnection:
|
|
config = ConfigFile(remote)
|
|
if config.url is None:
|
|
raise MissingConfigurationException(f"Remote host `{remote}` is not found.")
|
|
return MDRSConnection(config.remote, config.url)
|
|
|
|
def _find_laboratory(self, connection: MDRSConnection, name: str) -> Laboratory:
|
|
if connection.laboratories.empty() or connection.token is not None and connection.token.is_expired:
|
|
laboratory_api = LaboratoryApi(connection)
|
|
connection.laboratories = laboratory_api.list()
|
|
laboratory = connection.laboratories.find_by_name(name)
|
|
if laboratory is None:
|
|
raise IllegalArgumentException(f"Laboratory `{name}` not found.")
|
|
return laboratory
|
|
|
|
def _find_folder(
|
|
self, connection: MDRSConnection, laboratory: Laboratory, path: str, password: str | None = None
|
|
) -> Folder:
|
|
folder_api = FolderApi(connection)
|
|
folders = folder_api.list(laboratory.id, path)
|
|
if len(folders) != 1:
|
|
raise UnexpectedException(f"Folder `{path}` not found.")
|
|
if folders[0].lock:
|
|
if password is None:
|
|
raise UnauthorizedException(f"Folder `{path}` is locked.")
|
|
folder_api.auth(folders[0].id, password)
|
|
return folder_api.retrieve(folders[0].id)
|
|
|
|
def _parse_remote_host(self, path: str) -> str:
|
|
path_array = path.split(":")
|
|
remote_host = path_array[0]
|
|
if len(path_array) == 2 and path_array[1] != "" or len(path_array) > 2:
|
|
raise IllegalArgumentException("Invalid remote host")
|
|
return remote_host
|
|
|
|
def _parse_remote_host_with_path(self, path: str) -> tuple[str, str, str]:
|
|
path = re.sub(r"//+|/\./+|/\.$", "/", path)
|
|
if re.search(r"/\.\./|/\.\.$", path) is not None:
|
|
raise IllegalArgumentException("Path traversal found.")
|
|
path_array = path.split(":")
|
|
if len(path_array) != 2:
|
|
raise IllegalArgumentException("Invalid remote host.")
|
|
remote_host = path_array[0]
|
|
folder_array = path_array[1].split("/")
|
|
is_absolute_path = folder_array[0] == ""
|
|
if not is_absolute_path:
|
|
raise IllegalArgumentException("Must be absolute paths.")
|
|
del folder_array[0]
|
|
if len(folder_array) == 0:
|
|
laboratory = ""
|
|
folder = ""
|
|
else:
|
|
laboratory = folder_array.pop(0)
|
|
folder = "/" + "/".join(folder_array)
|
|
return (remote_host, laboratory, folder)
|