implemented to move or rename file/folder command mv

This commit is contained in:
Yoshihiro OKUMURA 2023-05-09 18:38:58 +09:00
parent e7197673fc
commit fd2238f5ca
Signed by: orrisroot
GPG Key ID: 470AA444C92904B2
3 changed files with 70 additions and 31 deletions

View File

@ -45,12 +45,14 @@ class FileApi(BaseApi):
url = self.ENTRYPOINT + file.id + "/" url = self.ENTRYPOINT + file.id + "/"
token_check(self.connection) token_check(self.connection)
if path is not None: if path is not None:
# update file body
try: try:
with open(path, mode="rb") as fp: with open(path, mode="rb") as fp:
response = self._put(url, files={"file": fp}) response = self._put(url, files={"file": fp})
except OSError: except OSError:
raise UnexpectedException(f"Could not open `{path}` file.") raise UnexpectedException(f"Could not open `{path}` file.")
else: else:
# update metadata
data = {"name": file.name, "description": file.description} data = {"name": file.name, "description": file.description}
response = self._put(url, data=data) response = self._put(url, data=data)
self._raise_response_error(response) self._raise_response_error(response)
@ -64,7 +66,7 @@ class FileApi(BaseApi):
self._raise_response_error(response) self._raise_response_error(response)
return True return True
def move(self, file: File, folder_id: str | None) -> bool: def move(self, file: File, folder_id: str) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + file.id + "/move/" url = self.ENTRYPOINT + file.id + "/move/"
data = {"folder": folder_id} data = {"folder": folder_id}
@ -83,11 +85,12 @@ class FileApi(BaseApi):
def download(self, file: File, path: str) -> bool: def download(self, file: File, path: str) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.connection.build_url("v2/", file.download_url) url = "v2/" + file.download_url
r = self.connection.session.get(url, stream=True) response = self._get(url, stream=True)
self._raise_response_error(response)
try: try:
with open(path, "wb") as f: with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=4096): for chunk in response.iter_content(chunk_size=4096):
if chunk: if chunk:
f.write(chunk) f.write(chunk)
f.flush() f.flush()

View File

@ -50,7 +50,7 @@ class FolderApi(BaseApi):
def update(self, folder: FolderSimple) -> bool: def update(self, folder: FolderSimple) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT url = self.ENTRYPOINT + folder.id + "/"
data = { data = {
"name": folder.name, "name": folder.name,
"description": folder.description, "description": folder.description,
@ -68,6 +68,15 @@ class FolderApi(BaseApi):
self._raise_response_error(response) self._raise_response_error(response)
return True return True
def move(self, folder: FolderSimple, folder_id: str) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + folder.id + "/move/"
data = {"parent": folder_id}
token_check(self.connection)
response = self._post(url, data=data)
self._raise_response_error(response)
return True
def metadata(self, id: str) -> dict: def metadata(self, id: str) -> dict:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name) print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + id + "/metadata/" url = self.ENTRYPOINT + id + "/metadata/"

View File

@ -20,7 +20,7 @@ from mdrsclient.exceptions import (
MDRSException, MDRSException,
UnexpectedException, UnexpectedException,
) )
from mdrsclient.models import File, Folder from mdrsclient.models import File, Folder, FolderSimple
from mdrsclient.settings import NUMBER_OF_PROCESS from mdrsclient.settings import NUMBER_OF_PROCESS
@ -55,10 +55,10 @@ class FileCommand(BaseCommand):
download_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)") download_parser.add_argument("remote_path", help="Remote file path (remote:/lab/path/file)")
download_parser.add_argument("local_path", help="Local folder path (/foo/bar/)") download_parser.add_argument("local_path", help="Local folder path (/foo/bar/)")
download_parser.set_defaults(func=FileCommand.download) download_parser.set_defaults(func=FileCommand.download)
# move # mv
move_parser = top_level_subparsers.add_parser("move", help="move a file") move_parser = top_level_subparsers.add_parser("mv", help="move or rename file or folder")
move_parser.add_argument("src_path", help="Source remote file path (remote:/lab/path/file)") move_parser.add_argument("src_path", help="Source remote path (remote:/lab/path/src)")
move_parser.add_argument("dest_path", help="Destination remote file path (remote:/lab/path/file)") move_parser.add_argument("dest_path", help="Destination remote path (remote:/lab/path/dest)")
move_parser.set_defaults(func=FileCommand.move) move_parser.set_defaults(func=FileCommand.move)
# remove # remove
remove_parser = top_level_subparsers.add_parser("remove", help="remove a file") remove_parser = top_level_subparsers.add_parser("remove", help="remove a file")
@ -157,31 +157,58 @@ class FileCommand(BaseCommand):
if src_laboratory_name != dest_laboratory_name: if src_laboratory_name != dest_laboratory_name:
raise IllegalArgumentException("Laboratory mismatched.") raise IllegalArgumentException("Laboratory mismatched.")
src_path = src_path.rstrip("/") src_path = src_path.rstrip("/")
src_dirpath = os.path.dirname(src_path) src_dirname = os.path.dirname(src_path)
src_filename = os.path.basename(src_path) src_basename = os.path.basename(src_path)
if dest_path.endswith("/"): if dest_path.endswith("/"):
dest_dirpath = dest_path dest_dirname = dest_path
dest_filename = src_filename dest_basename = src_basename
else: else:
dest_dirpath = os.path.dirname(dest_path) dest_dirname = os.path.dirname(dest_path)
dest_filename = os.path.basename(dest_path) dest_basename = os.path.basename(dest_path)
connection = create_connection(src_remote) connection = create_connection(src_remote)
laboratory = find_laboratory(connection, src_laboratory_name) laboratory = find_laboratory(connection, src_laboratory_name)
src_folder = find_folder(connection, laboratory, src_dirpath) src_parent_folder = find_folder(connection, laboratory, src_dirname)
dest_folder = find_folder(connection, laboratory, dest_dirpath) dest_parent_folder = find_folder(connection, laboratory, dest_dirname)
src_file = src_folder.find_file(src_filename) src_file = src_parent_folder.find_file(src_basename)
if src_file is None: if src_file is not None:
raise IllegalArgumentException(f"File `{src_filename}` not found.") # source is file
dest_file = dest_folder.find_file(dest_filename) dest_file = dest_parent_folder.find_file(dest_basename)
if dest_file is not None: if dest_file is not None:
raise IllegalArgumentException(f"File `{dest_filename}` already exists.") raise IllegalArgumentException(f"File `{dest_basename}` already exists.")
file_api = FileApi(connection) dest_sub_folder = dest_parent_folder.find_sub_folder(dest_basename)
if src_folder.id != dest_folder.id: if dest_sub_folder is not None:
file_api.move(src_file, dest_folder.id) raise IllegalArgumentException(
if dest_filename != src_filename: f"Cannot overwrite non-folder `{dest_basename}` with folder `{dest_path}`."
dest_file_dict = dataclasses.asdict(src_file) | {"name": dest_filename} )
dest_file = parse_obj_as(File, dest_file_dict) file_api = FileApi(connection)
file_api.update(dest_file, None) if src_parent_folder.id != dest_parent_folder.id:
file_api.move(src_file, dest_parent_folder.id)
if dest_basename != src_basename:
dest_file_dict = dataclasses.asdict(src_file) | {"name": dest_basename}
dest_file = parse_obj_as(File, dest_file_dict)
file_api.update(dest_file, None)
else:
src_folder = src_parent_folder.find_sub_folder(src_basename)
if src_folder is None:
raise IllegalArgumentException(f"File or Folder `{src_basename}` not found.")
# source is folder
dest_file = dest_parent_folder.find_file(dest_basename)
if dest_file is not None:
raise IllegalArgumentException(
f"Cannot overwrite non-folder `{dest_basename}` with folder `{src_path}`."
)
dest_folder = dest_parent_folder.find_sub_folder(dest_basename)
if dest_folder is not None:
if dest_folder.id == src_folder.id:
raise IllegalArgumentException(f"`{src_path}` and `{src_path}` are the same folder.")
raise IllegalArgumentException(f"Cannot move `{src_path}` to `{dest_path}`: Folder not empty.")
folder_api = FolderApi(connection)
if src_parent_folder.id != dest_parent_folder.id:
folder_api.move(src_folder, dest_parent_folder.id)
if src_basename != dest_basename:
dest_folder_dict = dataclasses.asdict(src_folder) | {"name": dest_basename}
dest_folder = parse_obj_as(FolderSimple, dest_folder_dict)
folder_api.update(dest_folder)
@staticmethod @staticmethod
def remove(args: Namespace) -> None: def remove(args: Namespace) -> None: