import dataclasses import os from argparse import Namespace, _SubParsersAction from concurrent.futures import ThreadPoolExecutor from pydantic import parse_obj_as from pydantic.dataclasses import dataclass from mdrsclient.api import FileApi, FolderApi from mdrsclient.commands.base import BaseCommand from mdrsclient.commands.utils import ( create_connection, find_folder, find_laboratory, parse_remote_host_with_path, ) from mdrsclient.connection import MDRSConnection from mdrsclient.exceptions import ( IllegalArgumentException, MDRSException, UnexpectedException, ) from mdrsclient.models import File, Folder from mdrsclient.settings import NUMBER_OF_PROCESS @dataclass(frozen=True) class UploadFile: folder: Folder path: str class FileCommand(BaseCommand): @staticmethod def register(top_level_subparsers: _SubParsersAction) -> None: # upload upload_parser = top_level_subparsers.add_parser("upload", help="upload the file or directories") upload_parser.add_argument( "-r", "--recursive", help="Upload directories and their contents recursive", action="store_true" ) upload_parser.add_argument("local_path", help="Local file path (/foo/bar/data.txt)") upload_parser.add_argument("remote_path", help="Remote folder path (remote:/lab/path/)") upload_parser.set_defaults(func=FileCommand.upload) # download download_parser = top_level_subparsers.add_parser("download", help="download a file") download_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") download_parser.add_argument("local_path", help="Local folder path (/foo/bar/)") download_parser.set_defaults(func=FileCommand.download) # move move_parser = top_level_subparsers.add_parser("move", help="move a file") move_parser.add_argument("src_path", help="Source remote file path (remote:/lab/path/file)") move_parser.add_argument("dest_path", help="Destination remote file path (remote:/lab/path/file)") move_parser.set_defaults(func=FileCommand.move) # remove remove_parser = top_level_subparsers.add_parser("remove", help="remove a file") remove_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") remove_parser.set_defaults(func=FileCommand.remove) # file-metadata metadata_parser = top_level_subparsers.add_parser("file-metadata", help="get the file metadata") metadata_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") metadata_parser.set_defaults(func=FileCommand.metadata) @staticmethod def upload(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) local_path = os.path.realpath(args.local_path) if not os.path.exists(local_path): raise IllegalArgumentException(f"File or directory `{args.local_path}` not found.") connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, path) upload_files: list[UploadFile] = [] if os.path.isdir(local_path): if not args.recursive: raise IllegalArgumentException(f"Cannot upload `{args.local_path}`: Is a directory.") folder_api = FolderApi(connection) folders: dict[str, Folder] = {} folders[path] = folder local_basename = os.path.basename(local_path) for dirpath, dirnames, filenames in os.walk(local_path): sub = ( local_basename if dirpath == local_path else os.path.join(local_basename, os.path.relpath(dirpath, local_path)) ) dest_folder_path = os.path.join(path, sub) dest_folder_name = os.path.basename(dest_folder_path) # prepare destination parent path dest_parent_folder_path = os.path.dirname(dest_folder_path) if folders.get(dest_parent_folder_path) is None: res = folder_api.list(laboratory.id, dest_parent_folder_path) if len(res) != 1: raise UnexpectedException(f"Remote folder `{dest_parent_folder_path}` not found.") folders[dest_parent_folder_path] = folder_api.retrieve(res[0].id) # prepare destination path if folders.get(dest_folder_path) is None: dest_folder_simple = folders[dest_parent_folder_path].find_sub_folder(dest_folder_name) if dest_folder_simple is None: dest_folder_id = folder_api.create(dest_folder_name, folders[dest_parent_folder_path].id) else: dest_folder_id = dest_folder_simple.id folders[dest_folder_path] = folder_api.retrieve(dest_folder_id) if dest_folder_simple is None: folders[dest_parent_folder_path].sub_folders.append(folders[dest_folder_path]) # register upload file list for filename in filenames: upload_files.append(UploadFile(folders[dest_folder_path], os.path.join(dirpath, filename))) else: upload_files.append(UploadFile(folder, local_path)) FileCommand._multiple_upload(connection, upload_files) @staticmethod def download(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) if not os.path.isdir(args.local_path): raise IllegalArgumentException(f"Local directory `{args.local_path}` not found.") local_file = os.path.join(args.local_path, file_name) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, parent_path) file = folder.find_file(file_name) if file is None: raise IllegalArgumentException(f"File `{file_name}` not found.") r = connection.session.get(connection.build_url("v2/" + file.download_url), stream=True) try: with open(local_file, "wb") as f: for chunk in r.iter_content(chunk_size=4096): if chunk: f.write(chunk) f.flush() except PermissionError: raise IllegalArgumentException(f"Cannot create file `{local_file}`: Permission denied.") @staticmethod def move(args: Namespace) -> None: (src_remote, src_laboratory_name, src_path) = parse_remote_host_with_path(args.src_path) (dest_remote, dest_laboratory_name, dest_path) = parse_remote_host_with_path(args.dest_path) if src_remote != dest_remote: raise IllegalArgumentException("Remote host mismatched.") if src_laboratory_name != dest_laboratory_name: raise IllegalArgumentException("Laboratory mismatched.") src_path = src_path.rstrip("/") src_dirpath = os.path.dirname(src_path) src_filename = os.path.basename(src_path) if dest_path.endswith("/"): dest_dirpath = dest_path dest_filename = src_filename else: dest_dirpath = os.path.dirname(dest_path) dest_filename = os.path.basename(dest_path) connection = create_connection(src_remote) laboratory = find_laboratory(connection, src_laboratory_name) src_folder = find_folder(connection, laboratory, src_dirpath) dest_folder = find_folder(connection, laboratory, dest_dirpath) src_file = src_folder.find_file(src_filename) if src_file is None: raise IllegalArgumentException(f"File `{src_filename}` not found.") dest_file = dest_folder.find_file(dest_filename) if dest_file is not None: raise IllegalArgumentException(f"File `{dest_filename}` already exists.") file_api = FileApi(connection) if src_folder.id != dest_folder.id: file_api.move(src_file, dest_folder.id) if dest_filename != src_filename: dest_file_dict = dataclasses.asdict(src_file) | {"name": dest_filename} dest_file = parse_obj_as(File, dest_file_dict) file_api.update(dest_file, None) @staticmethod def remove(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, parent_path) file = folder.find_file(file_name) if file is None: raise IllegalArgumentException(f"File `{file_name}` not found.") file_api = FileApi(connection) file_api.destroy(file) @staticmethod def metadata(args: Namespace) -> None: (remote, laboratory_name, path) = parse_remote_host_with_path(args.remote_path) path = path.rstrip("/") parent_path = os.path.dirname(path) file_name = os.path.basename(path) connection = create_connection(remote) laboratory = find_laboratory(connection, laboratory_name) folder = find_folder(connection, laboratory, parent_path) file = folder.find_file(file_name) if file is None: raise IllegalArgumentException(f"File `{file_name}` not found.") file_api = FileApi(connection) metadata = file_api.metadata(file) print(metadata) @staticmethod def _multiple_upload(connection: MDRSConnection, upload_files: list[UploadFile]) -> None: file_api = FileApi(connection) with ThreadPoolExecutor(max_workers=NUMBER_OF_PROCESS) as pool: pool.map(lambda x: FileCommand._multiple_upload_worker(file_api, x), upload_files) @staticmethod def _multiple_upload_worker(file_api: FileApi, upload_file: UploadFile) -> None: file_name = os.path.basename(upload_file.path) file = next((x for x in upload_file.folder.files if x.name == file_name), None) try: if file is None: file_api.create(upload_file.folder.id, upload_file.path) else: file_api.update(file, upload_file.path) pass except MDRSException as e: print(f"API Error: {e}")