132 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			132 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import mimetypes
 | 
						|
import os
 | 
						|
from typing import Any, Final
 | 
						|
 | 
						|
from pydantic import TypeAdapter
 | 
						|
from pydantic.dataclasses import dataclass
 | 
						|
from requests_toolbelt.multipart.encoder import MultipartEncoder
 | 
						|
 | 
						|
from mdrsclient.api.base import BaseApi
 | 
						|
from mdrsclient.api.utils import token_check
 | 
						|
from mdrsclient.exceptions import UnexpectedException
 | 
						|
from mdrsclient.models import File
 | 
						|
 | 
						|
 | 
						|
@dataclass(frozen=True)
 | 
						|
class FilesApiCreateResponse:
 | 
						|
    id: str
 | 
						|
 | 
						|
 | 
						|
class FilesApi(BaseApi):
 | 
						|
    ENTRYPOINT: Final[str] = "v3/files/"
 | 
						|
    FALLBACK_MIMETYPE: Final[str] = "application/octet-stream"
 | 
						|
 | 
						|
    def retrieve(self, id: str) -> File:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + id + "/"
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.get(url)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return TypeAdapter(File).validate_python(response.json())
 | 
						|
 | 
						|
    def create(self, folder_id: str, path: str) -> str:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT
 | 
						|
        token_check(self.connection)
 | 
						|
        data: dict[str, str | int] | MultipartEncoder = {}
 | 
						|
        try:
 | 
						|
            with open(os.path.realpath(path), mode="rb") as fp:
 | 
						|
                data = MultipartEncoder(
 | 
						|
                    fields={"folder_id": folder_id, "file": (os.path.basename(path), fp, self._get_mime_type(path))}
 | 
						|
                )
 | 
						|
                response = self.connection.post(url, data=data, headers={"Content-Type": data.content_type})
 | 
						|
                self._raise_response_error(response)
 | 
						|
                ret = TypeAdapter(FilesApiCreateResponse).validate_python(response.json())
 | 
						|
        except OSError:
 | 
						|
            raise UnexpectedException(f"Could not open `{path}` file.")
 | 
						|
        except MemoryError:
 | 
						|
            raise UnexpectedException("Out of memory.")
 | 
						|
        except Exception as e:
 | 
						|
            raise UnexpectedException("Unspecified error.") from e
 | 
						|
        return ret.id
 | 
						|
 | 
						|
    def update(self, file: File, path: str | None) -> bool:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + file.id + "/"
 | 
						|
        token_check(self.connection)
 | 
						|
        data: dict[str, str | int] | MultipartEncoder = {}
 | 
						|
        if path is not None:
 | 
						|
            # update file body
 | 
						|
            try:
 | 
						|
                with open(os.path.realpath(path), mode="rb") as fp:
 | 
						|
                    data = MultipartEncoder(fields={"file": (os.path.basename(path), fp, self._get_mime_type(path))})
 | 
						|
                    response = self.connection.put(url, data=data, headers={"Content-Type": data.content_type})
 | 
						|
            except OSError:
 | 
						|
                raise UnexpectedException(f"Could not open `{path}` file.")
 | 
						|
            except MemoryError:
 | 
						|
                raise UnexpectedException("Out of memory.")
 | 
						|
            except Exception as e:
 | 
						|
                raise UnexpectedException("Unspecified error.") from e
 | 
						|
        else:
 | 
						|
            # update metadata
 | 
						|
            data = {"name": file.name, "description": file.description}
 | 
						|
            response = self.connection.put(url, data=data)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return True
 | 
						|
 | 
						|
    def destroy(self, file: File) -> bool:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + file.id + "/"
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.delete(url)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return True
 | 
						|
 | 
						|
    def move(self, file: File, folder_id: str, name: str) -> bool:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + file.id + "/move/"
 | 
						|
        data: dict[str, str | int] = {"folder": folder_id, "name": name}
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.post(url, data=data)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return True
 | 
						|
 | 
						|
    def copy(self, file: File, folder_id: str, name: str) -> bool:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + file.id + "/copy/"
 | 
						|
        data: dict[str, str | int] = {"folder": folder_id, "name": name}
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.post(url, data=data)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return True
 | 
						|
 | 
						|
    def metadata(self, file: File) -> dict[str, Any]:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = self.ENTRYPOINT + file.id + "/metadata/"
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.get(url)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        return response.json()
 | 
						|
 | 
						|
    def download(self, file: File, path: str) -> bool:
 | 
						|
        # print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
 | 
						|
        url = file.download_url
 | 
						|
        token_check(self.connection)
 | 
						|
        response = self.connection.get(url, stream=True)
 | 
						|
        self._raise_response_error(response)
 | 
						|
        try:
 | 
						|
            with open(path, "wb") as f:
 | 
						|
                for chunk in response.iter_content(chunk_size=4096):
 | 
						|
                    if chunk:
 | 
						|
                        f.write(chunk)
 | 
						|
                        f.flush()
 | 
						|
        except PermissionError:
 | 
						|
            print(f"Cannot create file `{path}`: Permission denied.")
 | 
						|
        return True
 | 
						|
 | 
						|
    def _get_mime_type(self, path: str) -> str:
 | 
						|
        mt = mimetypes.guess_type(path)
 | 
						|
        if mt:
 | 
						|
            return mt[0] or self.FALLBACK_MIMETYPE
 | 
						|
        return self.FALLBACK_MIMETYPE
 |