changed API endpoint from v2 to v3.
This commit is contained in:
107
mdrsclient/api/files.py
Normal file
107
mdrsclient/api/files.py
Normal file
@ -0,0 +1,107 @@
|
||||
from typing import Any, Final
|
||||
|
||||
from pydantic import TypeAdapter
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
||||
from mdrsclient.api.base import BaseApi
|
||||
from mdrsclient.api.utils import token_check
|
||||
from mdrsclient.exceptions import UnexpectedException
|
||||
from mdrsclient.models import File
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FilesApiCreateResponse:
|
||||
id: str
|
||||
|
||||
|
||||
class FilesApi(BaseApi):
|
||||
ENTRYPOINT: Final[str] = "v3/files/"
|
||||
|
||||
def retrieve(self, id: str) -> File:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + id + "/"
|
||||
token_check(self.connection)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
return TypeAdapter(File).validate_python(response.json())
|
||||
|
||||
def create(self, folder_id: str, path: str) -> str:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT
|
||||
token_check(self.connection)
|
||||
data: dict[str, str | int] = {"folder_id": folder_id}
|
||||
try:
|
||||
with open(path, mode="rb") as fp:
|
||||
response = self.connection.post(url, data=data, files={"file": fp})
|
||||
self._raise_response_error(response)
|
||||
ret = TypeAdapter(FilesApiCreateResponse).validate_python(response.json())
|
||||
except OSError:
|
||||
raise UnexpectedException(f"Could not open `{path}` file.")
|
||||
return ret.id
|
||||
|
||||
def update(self, file: File, path: str | None) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/"
|
||||
token_check(self.connection)
|
||||
if path is not None:
|
||||
# update file body
|
||||
try:
|
||||
with open(path, mode="rb") as fp:
|
||||
response = self.connection.put(url, files={"file": fp})
|
||||
except OSError:
|
||||
raise UnexpectedException(f"Could not open `{path}` file.")
|
||||
else:
|
||||
# update metadata
|
||||
data: dict[str, str | int] = {"name": file.name, "description": file.description}
|
||||
response = self.connection.put(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
def destroy(self, file: File) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/"
|
||||
token_check(self.connection)
|
||||
response = self.connection.delete(url)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
def move(self, file: File, folder_id: str, name: str) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/move/"
|
||||
data: dict[str, str | int] = {"folder": folder_id, "name": name}
|
||||
token_check(self.connection)
|
||||
response = self.connection.post(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
def copy(self, file: File, folder_id: str, name: str) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/copy/"
|
||||
data: dict[str, str | int] = {"folder": folder_id, "name": name}
|
||||
token_check(self.connection)
|
||||
response = self.connection.post(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
def metadata(self, file: File) -> dict[str, Any]:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/metadata/"
|
||||
token_check(self.connection)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
return response.json()
|
||||
|
||||
def download(self, file: File, path: str) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = file.download_url
|
||||
response = self.connection.get(url, stream=True)
|
||||
self._raise_response_error(response)
|
||||
try:
|
||||
with open(path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=4096):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
f.flush()
|
||||
except PermissionError:
|
||||
print(f"Cannot create file `{path}`: Permission denied.")
|
||||
return True
|
Reference in New Issue
Block a user