import dataclasses import os from argparse import Namespace, _SubParsersAction from concurrent.futures import ThreadPoolExecutor from pydantic import parse_obj_as from pydantic.dataclasses import dataclass from mdrsclient.api import FileApi, FolderApi from mdrsclient.commands.base import BaseCommand from mdrsclient.commands.utils import ( create_connection, find_folder, find_laboratory, parse_remote_host_with_path, ) from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import ( IllegalArgumentException, MDRSException, UnexpectedException, ) from mdrsclient.models import File, Folder from mdrsclient.settings import NUMBER_OF_PROCESS @dataclass(frozen=True) class UploadFile: folder: Folder path: str @dataclass(frozen=True) class DownloadFile: file: File path: str class FileCommand(BaseCommand): @staticmethod def register(top_level_subparsers: _SubParsersAction) -> None: # upload upload_parser = top_level_subparsers.add_parser("upload", help="upload the file or directories") upload_parser.add_argument( "-r", "--recursive", help="Upload directories and their contents recursive", action="store_true" ) upload_parser.add_argument("local_path", help="Local file path (/foo/bar/data.txt)") upload_parser.add_argument("remote_path", help="Remote folder path (remote:/lab/path/)") upload_parser.set_defaults(func=FileCommand.upload) # download download_parser = top_level_subparsers.add_parser("download", help="download the file or folders") download_parser.add_argument( "-r", "--recursive", help="Download folders and their contents recursive", action="store_true" ) download_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") download_parser.add_argument("local_path", help="Local folder path (/foo/bar/)") download_parser.set_defaults(func=FileCommand.download) # move move_parser = top_level_subparsers.add_parser("move", help="move a file") move_parser.add_argument("src_path", help="Source remote file path (remote:/lab/path/file)") move_parser.add_argument("dest_path", help="Destination remote file path (remote:/lab/path/file)") move_parser.set_defaults(func=FileCommand.move) # remove remove_parser = top_level_subparsers.add_parser("remove", help="remove a file") remove_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") remove_parser.set_defaults(func=FileCommand.remove) # file-metadata metadata_parser = top_level_subparsers.add_parser("file-metadata", help="get the file metadata") metadata_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") metadata_parser.set_defaults(func=FileCommand.metadata) @staticmethod def upload(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) local_path = os.path.realpath(args.local_path) if not os.path.exists(local_path): raise IllegalArgumentException(f"File or directory `{args.local_path}` not found.") connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, path) upload_files: list[UploadFile] = [] if os.path.isdir(local_path): if not args.recursive: raise IllegalArgumentException(f"Cannot upload `{args.local_path}`: Is a directory.") folder_api = FolderApi(connection) folders: dict[str, Folder] = {} folders[path] = folder local_basename = os.path.basename(local_path) for dirpath, dirnames, filenames in os.walk(local_path): sub = ( local_basename if dirpath == local_path else os.path.join(local_basename, os.path.relpath(dirpath, local_path)) ) dest_folder_path = os.path.join(path, sub) dest_folder_name = os.path.basename(dest_folder_path) # prepare destination parent path dest_parent_folder_path = os.path.dirname(dest_folder_path) if folders.get(dest_parent_folder_path) is None: res = folder_api.list(laboratory.id, dest_parent_folder_path) if len(res) != 1: raise UnexpectedException(f"Remote folder `{dest_parent_folder_path}` not found.") folders[dest_parent_folder_path] = folder_api.retrieve(res[0].id) # prepare destination path if folders.get(dest_folder_path) is None: dest_folder_simple = folders[dest_parent_folder_path].find_sub_folder(dest_folder_name) if dest_folder_simple is None: dest_folder_id = folder_api.create(dest_folder_name, folders[dest_parent_folder_path].id) else: dest_folder_id = dest_folder_simple.id folders[dest_folder_path] = folder_api.retrieve(dest_folder_id) if dest_folder_simple is None: folders[dest_parent_folder_path].sub_folders.append(folders[dest_folder_path]) # register upload file list for filename in filenames: upload_files.append(UploadFile(folders[dest_folder_path], os.path.join(dirpath, filename))) else: upload_files.append(UploadFile(folder, local_path)) FileCommand._multiple_upload(connection, upload_files) @staticmethod def download(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) local_path = os.path.abspath(args.local_path) if not os.path.isdir(local_path): raise IllegalArgumentException(f"Local directory `{args.local_path}` not found.") laboratory = find_laboratory(connection, laboratory_name) parent_folder = find_folder(connection, laboratory, parent_path) file = parent_folder.find_file(file_name) download_files: list[DownloadFile] = [] if file is not None: file_path = os.path.join(local_path, file_name) download_files.append(DownloadFile(file, file_path)) else: if not args.recursive: raise IllegalArgumentException(f"Cannot download `{args.remote_path}`: Is a folder.") sub_folder = parent_folder.find_sub_folder(file_name) if sub_folder is None: raise IllegalArgumentException(f"File or Folder`{file_name}` not found.") folder_api = FolderApi(connection) sub_folder_dirname = os.path.join(local_path, sub_folder.name) FileCommand._multiple_download_pickup_recursive_files( folder_api, download_files, sub_folder.id, sub_folder_dirname ) FileCommand._multiple_download(connection, download_files) @staticmethod def move(args: Namespace) -> None: (src_remote, src_laboratory_name, src_path) = parse_remote_host_with_path(args.src_path) (dest_remote, dest_laboratory_name, dest_path) = parse_remote_host_with_path(args.dest_path) if src_remote != dest_remote: raise IllegalArgumentException("Remote host mismatched.") if src_laboratory_name != dest_laboratory_name: raise IllegalArgumentException("Laboratory mismatched.") src_path = src_path.rstrip("/") src_dirpath = os.path.dirname(src_path) src_filename = os.path.basename(src_path) if dest_path.endswith("/"): dest_dirpath = dest_path dest_filename = src_filename else: dest_dirpath = os.path.dirname(dest_path) dest_filename = os.path.basename(dest_path) connection = create_connection(src_remote) laboratory = find_laboratory(connection, src_laboratory_name) src_folder = find_folder(connection, laboratory, src_dirpath) dest_folder = find_folder(connection, laboratory, dest_dirpath) src_file = src_folder.find_file(src_filename) if src_file is None: raise IllegalArgumentException(f"File `{src_filename}` not found.") dest_file = dest_folder.find_file(dest_filename) if dest_file is not None: raise IllegalArgumentException(f"File `{dest_filename}` already exists.") file_api = FileApi(connection) if src_folder.id != dest_folder.id: file_api.move(src_file, dest_folder.id) if dest_filename != src_filename: dest_file_dict = dataclasses.asdict(src_file) | {"name": dest_filename} dest_file = parse_obj_as(File, dest_file_dict) file_api.update(dest_file, None) @staticmethod def remove(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, parent_path) file = folder.find_file(file_name) if file is None: raise IllegalArgumentException(f"File `{file_name}` not found.") file_api = FileApi(connection) file_api.destroy(file) @staticmethod def metadata(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, parent_path) file = folder.find_file(file_name) if file is None: raise IllegalArgumentException(f"File `{file_name}` not found.") file_api = FileApi(connection) metadata = file_api.metadata(file) print(metadata) @staticmethod def _multiple_upload(connection: MDRSConnection, upload_files: list[UploadFile]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=NUMBER_OF_PROCESS) as pool: pool.map(lambda x: FileCommand._multiple_upload_worker(file_api, x), upload_files) @staticmethod def _multiple_upload_worker(file_api: FileApi, upload_file: UploadFile) -> None: file_name = os.path.basename(upload_file.path) file = next((x for x in upload_file.folder.files if x.name == file_name), None) try: if file is None: file_api.create(upload_file.folder.id, upload_file.path) else: file_api.update(file, upload_file.path) pass except MDRSException as e: print(f"API Error: {e}") @staticmethod def _multiple_download_pickup_recursive_files( folder_api: FolderApi, download_files: list[DownloadFile], folder_id: str, local_dirname: str ) -> None: folder = folder_api.retrieve(folder_id) file_dirname = os.path.join(local_dirname, folder.name) if not os.path.exists(file_dirname): os.makedirs(file_dirname) for file in folder.files: file_path = os.path.join(file_dirname, file.name) download_files.append(DownloadFile(file, file_path)) for sub_folder in folder.sub_folders: sub_folder_dirname = os.path.join(local_dirname, sub_folder.name) FileCommand._multiple_download_pickup_recursive_files( folder_api, download_files, sub_folder.id, sub_folder_dirname ) @staticmethod def _multiple_download(connection: MDRSConnection, download_files: list[DownloadFile]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=NUMBER_OF_PROCESS) as pool: pool.map(lambda x: FileCommand._multiple_download_worker(file_api, x), download_files) @staticmethod def _multiple_download_worker(file_api: FileApi, download_file: DownloadFile) -> None: file_api.download(download_file.file, download_file.path)