4283481695
On macOS, local filenames and directory names may be in NFD encoding
(decomposed Unicode). Without normalization, files and folders are
created on the server with NFD names, inconsistent with the server's
NFC convention.
Apply normalize("NFC", ...) before sending names to the server in:
- FilesApi.create(): filename in multipart upload
- FilesApi.update(): filename in multipart upload
- UploadCommand: directory name in FoldersApi.create()
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
149 lines
6.1 KiB
Python
149 lines
6.1 KiB
Python
import mimetypes
|
|
import os
|
|
from typing import Any, Final
|
|
from unicodedata import normalize
|
|
|
|
from pydantic import TypeAdapter
|
|
from pydantic.dataclasses import dataclass
|
|
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
|
|
|
from mdrsclient.api.base import BaseApi
|
|
from mdrsclient.api.utils import token_check
|
|
from mdrsclient.exceptions import UnexpectedException
|
|
from mdrsclient.models import File
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FilesApiCreateResponse:
|
|
id: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FilesApiListResponse:
|
|
count: int
|
|
next: str | None
|
|
previous: str | None
|
|
results: list[File]
|
|
|
|
|
|
class FilesApi(BaseApi):
|
|
ENTRYPOINT: Final[str] = "v3/files/"
|
|
FALLBACK_MIMETYPE: Final[str] = "application/octet-stream"
|
|
|
|
def list(self, folder_id: str, page_num: int) -> FilesApiListResponse:
|
|
url = self.ENTRYPOINT
|
|
token_check(self.connection)
|
|
params: dict[str, str | int] = {"folder_id": folder_id, "page": page_num}
|
|
response = self.connection.get(url, params=params)
|
|
self._raise_response_error(response)
|
|
return TypeAdapter(FilesApiListResponse).validate_python(response.json())
|
|
|
|
def retrieve(self, id: str) -> File:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + id + "/"
|
|
token_check(self.connection)
|
|
response = self.connection.get(url)
|
|
self._raise_response_error(response)
|
|
return TypeAdapter(File).validate_python(response.json())
|
|
|
|
def create(self, folder_id: str, path: str) -> str:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT
|
|
token_check(self.connection)
|
|
data: dict[str, str | int] | MultipartEncoder = {}
|
|
try:
|
|
with open(os.path.realpath(path), mode="rb") as fp:
|
|
data = MultipartEncoder(
|
|
fields={"folder_id": folder_id, "file": (normalize("NFC", os.path.basename(path)), fp, self._get_mime_type(path))}
|
|
)
|
|
response = self.connection.post(url, data=data, headers={"Content-Type": data.content_type})
|
|
self._raise_response_error(response)
|
|
ret = TypeAdapter(FilesApiCreateResponse).validate_python(response.json())
|
|
except OSError:
|
|
raise UnexpectedException(f"Could not open `{path}` file.")
|
|
except MemoryError:
|
|
raise UnexpectedException("Out of memory.")
|
|
except Exception as e:
|
|
raise UnexpectedException("Unspecified error.") from e
|
|
return ret.id
|
|
|
|
def update(self, file: File, path: str | None) -> bool:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + file.id + "/"
|
|
token_check(self.connection)
|
|
data: dict[str, str | int] | MultipartEncoder = {}
|
|
if path is not None:
|
|
# update file body
|
|
try:
|
|
with open(os.path.realpath(path), mode="rb") as fp:
|
|
data = MultipartEncoder(fields={"file": (normalize("NFC", os.path.basename(path)), fp, self._get_mime_type(path))})
|
|
response = self.connection.put(url, data=data, headers={"Content-Type": data.content_type})
|
|
except OSError:
|
|
raise UnexpectedException(f"Could not open `{path}` file.")
|
|
except MemoryError:
|
|
raise UnexpectedException("Out of memory.")
|
|
except Exception as e:
|
|
raise UnexpectedException("Unspecified error.") from e
|
|
else:
|
|
# update metadata
|
|
data = {"name": file.name, "description": file.description}
|
|
response = self.connection.put(url, data=data)
|
|
self._raise_response_error(response)
|
|
return True
|
|
|
|
def destroy(self, file: File) -> bool:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + file.id + "/"
|
|
token_check(self.connection)
|
|
response = self.connection.delete(url)
|
|
self._raise_response_error(response)
|
|
return True
|
|
|
|
def move(self, file: File, folder_id: str, name: str) -> bool:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + file.id + "/move/"
|
|
data: dict[str, str | int] = {"folder": folder_id, "name": name}
|
|
token_check(self.connection)
|
|
response = self.connection.post(url, data=data)
|
|
self._raise_response_error(response)
|
|
return True
|
|
|
|
def copy(self, file: File, folder_id: str, name: str) -> bool:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + file.id + "/copy/"
|
|
data: dict[str, str | int] = {"folder": folder_id, "name": name}
|
|
token_check(self.connection)
|
|
response = self.connection.post(url, data=data)
|
|
self._raise_response_error(response)
|
|
return True
|
|
|
|
def metadata(self, file: File) -> dict[str, Any]:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = self.ENTRYPOINT + file.id + "/metadata/"
|
|
token_check(self.connection)
|
|
response = self.connection.get(url)
|
|
self._raise_response_error(response)
|
|
return response.json()
|
|
|
|
def download(self, file: File, path: str) -> bool:
|
|
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
|
url = file.download_url
|
|
token_check(self.connection)
|
|
response = self.connection.get(url, stream=True)
|
|
self._raise_response_error(response)
|
|
try:
|
|
with open(path, "wb") as f:
|
|
for chunk in response.iter_content(chunk_size=4096):
|
|
if chunk:
|
|
f.write(chunk)
|
|
f.flush()
|
|
except PermissionError:
|
|
print(f"Cannot create file `{path}`: Permission denied.")
|
|
return True
|
|
|
|
def _get_mime_type(self, path: str) -> str:
|
|
mt = mimetypes.guess_type(path)
|
|
if mt:
|
|
return mt[0] or self.FALLBACK_MIMETYPE
|
|
return self.FALLBACK_MIMETYPE
|