91 lines
4.3 KiB
Python
91 lines
4.3 KiB
Python
import os
|
|
from argparse import Namespace, _SubParsersAction
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from pydantic.dataclasses import dataclass
|
|
|
|
from mdrsclient.api import FileApi, FolderApi
|
|
from mdrsclient.commands.base import BaseCommand
|
|
from mdrsclient.connection import MDRSConnection
|
|
from mdrsclient.exceptions import IllegalArgumentException, MDRSException
|
|
from mdrsclient.models import Folder
|
|
from mdrsclient.settings import CONCURRENT
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class UploadFileInfo:
|
|
folder: Folder
|
|
path: str
|
|
|
|
|
|
class UploadCommand(BaseCommand):
|
|
@classmethod
|
|
def register(cls, parsers: _SubParsersAction) -> None:
|
|
command = cls()
|
|
upload_parser = parsers.add_parser("upload", help="upload the file or directory")
|
|
upload_parser.add_argument(
|
|
"-r", "--recursive", help="upload directories and their contents recursive", action="store_true"
|
|
)
|
|
upload_parser.add_argument("local_path", help="local file path (/foo/bar/data.txt)")
|
|
upload_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)")
|
|
upload_parser.set_defaults(func=command.upload)
|
|
|
|
def upload(self, args: Namespace) -> None:
|
|
(remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path)
|
|
l_path = os.path.realpath(args.local_path)
|
|
if not os.path.exists(l_path):
|
|
raise IllegalArgumentException(f"File or directory `{args.local_path}` not found.")
|
|
connection = self._create_connection(remote)
|
|
laboratory = self._find_laboratory(connection, laboratory_name)
|
|
folder = self._find_folder(connection, laboratory, r_path)
|
|
infos: list[UploadFileInfo] = []
|
|
if os.path.isdir(l_path):
|
|
if not args.recursive:
|
|
raise IllegalArgumentException(f"Cannot upload `{args.local_path}`: Is a directory.")
|
|
folder_api = FolderApi(connection)
|
|
folder_map: dict[str, Folder] = {}
|
|
folder_map[r_path] = folder
|
|
l_basename = os.path.basename(l_path)
|
|
for dirpath, _, filenames in os.walk(l_path):
|
|
sub = l_basename if dirpath == l_path else os.path.join(l_basename, os.path.relpath(dirpath, l_path))
|
|
d_dirname = os.path.join(r_path, sub)
|
|
d_basename = os.path.basename(d_dirname)
|
|
# prepare destination parent path
|
|
d_parent_dirname = os.path.dirname(d_dirname)
|
|
if folder_map.get(d_parent_dirname) is None:
|
|
folder_map[d_parent_dirname] = self._find_folder(connection, laboratory, d_parent_dirname)
|
|
# prepare destination path
|
|
if folder_map.get(d_dirname) is None:
|
|
d_folder = folder_map[d_parent_dirname].find_sub_folder(d_basename)
|
|
if d_folder is None:
|
|
d_folder_id = folder_api.create(d_basename, folder_map[d_parent_dirname].id)
|
|
else:
|
|
d_folder_id = d_folder.id
|
|
print(d_dirname)
|
|
folder_map[d_dirname] = folder_api.retrieve(d_folder_id)
|
|
if d_folder is None:
|
|
folder_map[d_parent_dirname].sub_folders.append(folder_map[d_dirname])
|
|
# register upload file list
|
|
for filename in filenames:
|
|
infos.append(UploadFileInfo(folder_map[d_dirname], os.path.join(dirpath, filename)))
|
|
else:
|
|
infos.append(UploadFileInfo(folder, l_path))
|
|
self.__multiple_upload(connection, infos)
|
|
|
|
def __multiple_upload(self, connection: MDRSConnection, infos: list[UploadFileInfo]) -> None:
|
|
file_api = FileApi(connection)
|
|
with ThreadPoolExecutor(max_workers=CONCURRENT) as pool:
|
|
pool.map(lambda x: self.__multiple_upload_worker(file_api, x), infos)
|
|
|
|
def __multiple_upload_worker(self, file_api: FileApi, info: UploadFileInfo) -> None:
|
|
basename = os.path.basename(info.path)
|
|
file = info.folder.find_file(basename)
|
|
try:
|
|
if file is None:
|
|
file_api.create(info.folder.id, info.path)
|
|
else:
|
|
file_api.update(file, info.path)
|
|
print(os.path.join(info.folder.path, basename))
|
|
except MDRSException as e:
|
|
print(f"API Error: {e}")
|