import os from concurrent.futures import ThreadPoolExecutor from typing import Any from unicodedata import normalize from pydantic.dataclasses import dataclass from mdrsclient.api import FilesApi, FoldersApi from mdrsclient.exceptions import IllegalArgumentException, MDRSException, UnexpectedException from mdrsclient.models import File, Folder, Laboratory from mdrsclient.models.file import find_file from mdrsclient.settings import CONCURRENT @dataclass(frozen=True) class UploadFileInfo: folder: Folder files: list[File] path: str @dataclass(frozen=True) class DownloadFileInfo: file: File path: str @dataclass class DownloadContext: hasError: bool isSkipIfExists: bool files: list[DownloadFileInfo] class Uploader: def __init__(self, client: Any) -> None: self.client = client def upload( self, local_path: str, remote_path: str, is_recursive: bool = False, is_skip_if_exists: bool = False ) -> None: remote, laboratory_name, r_path = self.client.parse_remote_host_with_path(remote_path) l_path = os.path.abspath(local_path) if not os.path.exists(l_path): raise IllegalArgumentException(f"File or directory `{local_path}` not found.") laboratory = self.client.find_laboratory(laboratory_name) folder = self.client.find_folder(laboratory, r_path) files = self.client.find_files(folder.id) infos: list[UploadFileInfo] = [] if os.path.isdir(l_path): if not is_recursive: raise IllegalArgumentException(f"Cannot upload `{local_path}`: Is a directory.") folder_api = FoldersApi(self.client.connection) folder_map: dict[str, Folder] = {} folder_map[r_path] = folder files_map: dict[str, list[File]] = {} files_map[r_path] = files l_basename = os.path.basename(l_path) for dirpath, _, filenames in os.walk(l_path, followlinks=True): sub = l_basename if dirpath == l_path else os.path.join(l_basename, os.path.relpath(dirpath, l_path)) d_dirname = os.path.join(r_path, sub) d_basename = os.path.basename(d_dirname) # prepare destination parent path d_parent_dirname = os.path.dirname(d_dirname) if folder_map.get(d_parent_dirname) is None: parent_folder = self.client.find_folder(laboratory, d_parent_dirname) folder_map[d_parent_dirname] = parent_folder parent_files = self.client.find_files(parent_folder.id) files_map[d_parent_dirname] = parent_files # prepare destination path if folder_map.get(d_dirname) is None: d_folder = folder_map[d_parent_dirname].find_sub_folder(d_basename) if d_folder is None: d_folder_id = folder_api.create(normalize("NFC", d_basename), folder_map[d_parent_dirname].id) else: d_folder_id = d_folder.id print(d_dirname) folder_map[d_dirname] = folder_api.retrieve(d_folder_id) files_map[d_dirname] = self.client.find_files(d_folder_id) if d_folder is None: folder_map[d_parent_dirname].sub_folders.append(folder_map[d_dirname]) # register upload file list for filename in filenames: infos.append( UploadFileInfo(folder_map[d_dirname], files_map[d_dirname], os.path.join(dirpath, filename)) ) else: infos.append(UploadFileInfo(folder, files, l_path)) self.__multiple_upload(infos, is_skip_if_exists) def __multiple_upload(self, infos: list[UploadFileInfo], is_skip_if_exists: bool) -> None: file_api = FilesApi(self.client.connection) with ThreadPoolExecutor(max_workers=CONCURRENT) as pool: pool.map(lambda x: self.__multiple_upload_worker(file_api, x, is_skip_if_exists), infos) def __multiple_upload_worker(self, file_api: FilesApi, info: UploadFileInfo, is_skip_if_exists: bool) -> None: basename = os.path.basename(info.path) file = find_file(info.files, basename) try: if file is None: file_api.create(info.folder.id, info.path) elif not is_skip_if_exists or file.size != os.path.getsize(info.path): file_api.update(file, info.path) print(os.path.join(info.folder.path, basename)) except MDRSException as e: print(f"Error: {e}") class Downloader: def __init__(self, client: Any) -> None: self.client = client def download( self, remote_path: str, local_path: str, is_recursive: bool = False, is_skip_if_exists: bool = False, password: str | None = None, excludes: list[str] | None = None, ) -> None: excludes_clean = excludes or [] # Detect DOI path: "remote:10.xxxx/prefix.ID[/optional/sub/path]" path_component = remote_path.split(":", 1)[1] if ":" in remote_path else "" if self.client.is_doi(path_component): remote, doi, subpath = self.client.parse_doi_remote_host(remote_path) l_dirname = os.path.realpath(local_path) if not os.path.isdir(l_dirname): raise IllegalArgumentException(f"Local directory `{local_path}` not found.") doi_folder, laboratory = self.client.find_folder_by_doi(doi, password) subpath_clean = subpath.rstrip("/") if not subpath_clean: folder = doi_folder is_folder = True else: r_dirname = os.path.dirname(subpath_clean) r_basename = os.path.basename(subpath_clean) abs_path = doi_folder.path.rstrip("/") + r_dirname r_parent_folder = self.client.find_folder(laboratory, abs_path, password) r_parent_files = self.client.find_files(r_parent_folder.id) file = find_file(r_parent_files, r_basename) if file is not None: if self.__check_excludes(excludes_clean, laboratory, r_parent_folder, file): return context = DownloadContext(False, is_skip_if_exists, []) l_path = os.path.join(l_dirname, r_basename) context.files.append(DownloadFileInfo(file, l_path)) self.__multiple_download(context) return else: folder_simple = r_parent_folder.find_sub_folder(r_basename) if folder_simple is None: raise IllegalArgumentException(f"File or folder `{subpath_clean}` not found.") folder = FoldersApi(self.client.connection).retrieve(folder_simple.id) is_folder = True # For a DOI target the whole folder is the download target. if not is_recursive: # Non-recursive: download only the files at the top level of the DOI folder. files = self.client.find_files(folder.id) context = DownloadContext(False, is_skip_if_exists, []) for file in files: if self.__check_excludes(excludes_clean, laboratory, folder, file): continue l_path = os.path.join(l_dirname, file.name) context.files.append(DownloadFileInfo(file, l_path)) self.__multiple_download(context) return folder_api = FoldersApi(self.client.connection) self.__multiple_download_pickup_recursive_files( folder_api, laboratory, folder.id, l_dirname, excludes_clean, is_skip_if_exists ) return remote, laboratory_name, r_path = self.client.parse_remote_host_with_path(remote_path) r_path = r_path.rstrip("/") r_dirname = os.path.dirname(r_path) r_basename = os.path.basename(r_path) l_dirname = os.path.realpath(local_path) if not os.path.isdir(l_dirname): raise IllegalArgumentException(f"Local directory `{local_path}` not found.") laboratory = self.client.find_laboratory(laboratory_name) r_parent_folder = self.client.find_folder(laboratory, r_dirname, password) r_parent_files = self.client.find_files(r_parent_folder.id) file = find_file(r_parent_files, r_basename) if file is not None: if self.__check_excludes(excludes_clean, laboratory, r_parent_folder, file): return context = DownloadContext(False, is_skip_if_exists, []) l_path = os.path.join(l_dirname, r_basename) context.files.append(DownloadFileInfo(file, l_path)) self.__multiple_download(context) else: folder = r_parent_folder.find_sub_folder(r_basename) if folder is None: raise IllegalArgumentException(f"File or folder `{r_path}` not found.") if not is_recursive: raise IllegalArgumentException(f"Cannot download `{r_path}`: Is a folder.") folder_api = FoldersApi(self.client.connection) self.__multiple_download_pickup_recursive_files( folder_api, laboratory, folder.id, l_dirname, excludes_clean, is_skip_if_exists ) def __multiple_download_pickup_recursive_files( self, folder_api: FoldersApi, laboratory: Laboratory, folder_id: str, basedir: str, excludes: list[str], is_skip_if_exists: bool, ) -> None: context = DownloadContext(False, is_skip_if_exists, []) folder = folder_api.retrieve(folder_id) files = self.client.find_files(folder.id) dirname = os.path.join(basedir, folder.name) if self.__check_excludes(excludes, laboratory, folder, None): return if not os.path.exists(dirname): os.makedirs(dirname) print(dirname) for file in files: if self.__check_excludes(excludes, laboratory, folder, file): continue path = os.path.join(dirname, file.name) context.files.append(DownloadFileInfo(file, path)) self.__multiple_download(context) if context.hasError: raise UnexpectedException("Some files failed to download.") for sub_folder in folder.sub_folders: self.__multiple_download_pickup_recursive_files( folder_api, laboratory, sub_folder.id, dirname, excludes, is_skip_if_exists ) def __multiple_download(self, context: DownloadContext) -> None: file_api = FilesApi(self.client.connection) with ThreadPoolExecutor(max_workers=CONCURRENT) as pool: results = pool.map( lambda x: self.__multiple_download_worker(file_api, x, context.isSkipIfExists), context.files ) hasError = next(filter(lambda x: x is False, results), None) if hasError is not None: context.hasError = True def __multiple_download_worker(self, file_api: FilesApi, info: DownloadFileInfo, is_skip_if_exists: bool) -> bool: if not is_skip_if_exists or not os.path.exists(info.path) or info.file.size != os.path.getsize(info.path): try: file_api.download(info.file, info.path) except Exception: print(f"Failed: {info.path}") if os.path.isfile(info.path): os.remove(info.path) return False print(info.path) return True def __check_excludes(self, excludes: list[str], laboratory: Laboratory, folder: Folder, file: File | None) -> bool: path = f"/{laboratory.name}{folder.path}{file.name if file is not None else ''}".rstrip("/").lower() return path in excludes