import os from argparse import Namespace, _SubParsersAction from concurrent.futures import ThreadPoolExecutor from pydantic.dataclasses import dataclass from mdrsclient.api import FileApi, FolderApi from mdrsclient.commands.base import BaseCommand from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import IllegalArgumentException, MDRSException from mdrsclient.models import Folder from mdrsclient.settings import CONCURRENT @dataclass(frozen=True) class UploadFileInfo: folder: Folder path: str class UploadCommand(BaseCommand): @classmethod def register(cls, parsers: _SubParsersAction) -> None: command = cls() upload_parser = parsers.add_parser("upload", help="upload the file or directory") upload_parser.add_argument( "-r", "--recursive", help="upload directories and their contents recursive", action="store_true" ) upload_parser.add_argument("local_path", help="local file path (/foo/bar/data.txt)") upload_parser.add_argument("remote_path", help="remote folder path (remote:/lab/path/)") upload_parser.set_defaults(func=command.upload) def upload(self, args: Namespace) -> None: (remote, laboratory_name, r_path) = self._parse_remote_host_with_path(args.remote_path) l_path = os.path.realpath(args.local_path) if not os.path.exists(l_path): raise IllegalArgumentException(f"File or directory `{args.local_path}` not found.") connection = self._create_connection(remote) laboratory = self._find_laboratory(connection, laboratory_name) folder = self._find_folder(connection, laboratory, r_path) infos: list[UploadFileInfo] = [] if os.path.isdir(l_path): if not args.recursive: raise IllegalArgumentException(f"Cannot upload `{args.local_path}`: Is a directory.") folder_api = FolderApi(connection) folder_map: dict[str, Folder] = {} folder_map[r_path] = folder l_basename = os.path.basename(l_path) for dirpath, dirnames, filenames in os.walk(l_path): sub = l_basename if dirpath == l_path else os.path.join(l_basename, os.path.relpath(dirpath, l_path)) d_dirname = os.path.join(r_path, sub) d_basename = os.path.basename(d_dirname) # prepare destination parent path d_parent_dirname = os.path.dirname(d_dirname) if folder_map.get(d_parent_dirname) is None: folder_map[d_parent_dirname] = self._find_folder(connection, laboratory, d_parent_dirname) # prepare destination path if folder_map.get(d_dirname) is None: d_folder = folder_map[d_parent_dirname].find_sub_folder(d_basename) if d_folder is None: d_folder_id = folder_api.create(d_basename, folder_map[d_parent_dirname].id) else: d_folder_id = d_folder.id print(d_dirname) folder_map[d_dirname] = folder_api.retrieve(d_folder_id) if d_folder is None: folder_map[d_parent_dirname].sub_folders.append(folder_map[d_dirname]) # register upload file list for filename in filenames: infos.append(UploadFileInfo(folder_map[d_dirname], os.path.join(dirpath, filename))) else: infos.append(UploadFileInfo(folder, l_path)) self.__multiple_upload(connection, infos) def __multiple_upload(self, connection: MDRSConnection, infos: list[UploadFileInfo]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=CONCURRENT) as pool: pool.map(lambda x: self.__multiple_upload_worker(file_api, x), infos) def __multiple_upload_worker(self, file_api: FileApi, info: UploadFileInfo) -> None: basename = os.path.basename(info.path) file = info.folder.find_file(basename) try: if file is None: file_api.create(info.folder.id, info.path) else: file_api.update(file, info.path) print(os.path.join(info.folder.path, basename)) except MDRSException as e: print(f"API Error: {e}")