first commit

This commit is contained in:
2023-05-01 20:00:32 +09:00
commit 819c4a6a07
39 changed files with 1866 additions and 0 deletions

View File

@ -0,0 +1,11 @@
from mdrsclient.api.file import FileApi
from mdrsclient.api.folder import FolderApi
from mdrsclient.api.laboratory import LaboratoryApi
from mdrsclient.api.user import UserApi
__all__ = [
"FileApi",
"FolderApi",
"LaboratoryApi",
"UserApi",
]

51
mdrsclient/api/base.py Normal file
View File

@ -0,0 +1,51 @@
from abc import ABC
import requests
from pydantic import parse_obj_as
from requests import Response
from mdrsclient.exceptions import (
BadRequestException,
ForbiddenException,
UnauthorizedException,
UnexpectedException,
)
from mdrsclient.models import DRFStandardizedErrors
from mdrsclient.session import MDRSSession
class BaseApi(ABC):
def __init__(self, session: MDRSSession) -> None:
self.session = session
def _get(self, url, *args, **kwargs) -> Response:
return self.session.get(self.__build_url(url), *args, **kwargs)
def _post(self, url, *args, **kwargs) -> Response:
return self.session.post(self.__build_url(url), *args, **kwargs)
def _put(self, url, *args, **kwargs) -> Response:
return self.session.put(self.__build_url(url), *args, **kwargs)
def _delete(self, url, *args, **kwargs) -> Response:
return self.session.delete(self.__build_url(url), *args, **kwargs)
def _patch(self, url, *args, **kwargs) -> Response:
return self.session.patch(self.__build_url(url), *args, **kwargs)
def _raise_response_error(self, response: Response) -> None:
if response.status_code >= 300:
if response.status_code < 400 or response.status_code >= 500:
raise UnexpectedException(f"Unexpected status code returned: {response.status_code}.")
errors = parse_obj_as(DRFStandardizedErrors, response.json())
if response.status_code == requests.codes.bad_request:
raise BadRequestException(errors.errors[0].detail)
elif response.status_code == requests.codes.unauthorized:
raise UnauthorizedException("Login required.")
elif response.status_code == requests.codes.forbidden:
raise ForbiddenException("You do not have enough permissions. Access is denied.")
else:
raise UnexpectedException(errors.errors[0].detail)
def __build_url(self, *args: tuple) -> str:
return self.session.build_url(*args)

82
mdrsclient/api/file.py Normal file
View File

@ -0,0 +1,82 @@
import sys
from typing import Final
from pydantic import parse_obj_as
from pydantic.dataclasses import dataclass
from mdrsclient.api.base import BaseApi
from mdrsclient.api.utils import token_check
from mdrsclient.exceptions import UnexpectedException
from mdrsclient.models import File
@dataclass(frozen=True)
class FileCreateResponse:
id: str
class FileApi(BaseApi):
ENTRYPOINT: Final[str] = "v2/file/"
def retrieve(self, id: str) -> File:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + id + "/"
token_check(self.session)
response = self._get(url)
self._raise_response_error(response)
return parse_obj_as(File, response.json())
def create(self, folder_id: str, path: str) -> str:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT
token_check(self.session)
data = {"folder_id": folder_id}
try:
with open(path, mode="rb") as fp:
response = self._post(url, data=data, files={"file": fp})
self._raise_response_error(response)
ret = parse_obj_as(FileCreateResponse, response.json())
except OSError:
raise UnexpectedException(f"Could not open `{path}` file.")
return ret.id
def update(self, file: File, path: str | None) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + file.id + "/"
token_check(self.session)
if path is not None:
try:
with open(path, mode="rb") as fp:
response = self._put(url, files={"file": fp})
except OSError:
raise UnexpectedException(f"Could not open `{path}` file.")
else:
data = {"name": file.name, "description": file.description}
response = self._put(url, data=data)
self._raise_response_error(response)
return True
def destroy(self, file: File) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + file.id + "/"
token_check(self.session)
response = self._delete(url)
self._raise_response_error(response)
return True
def move(self, file: File, folder_id: str | None) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + file.id + "/move/"
data = {"folder": folder_id}
token_check(self.session)
response = self._post(url, data=data)
self._raise_response_error(response)
return True
def metadata(self, file: File) -> dict:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + file.id + "/metadata/"
token_check(self.session)
response = self._get(url)
self._raise_response_error(response)
return response.json()

77
mdrsclient/api/folder.py Normal file
View File

@ -0,0 +1,77 @@
import sys
from typing import Final
from pydantic import parse_obj_as
from pydantic.dataclasses import dataclass
from mdrsclient.api.base import BaseApi
from mdrsclient.api.utils import token_check
from mdrsclient.models import Folder, FolderSimple
@dataclass(frozen=True)
class FolderCreateResponse:
id: str
class FolderApi(BaseApi):
ENTRYPOINT: Final[str] = "v2/folder/"
def list(self, laboratory_id: int, path: str) -> list[FolderSimple]:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT
params = {"path": path, "laboratory_id": laboratory_id}
token_check(self.session)
response = self._get(url, params=params)
self._raise_response_error(response)
ret: list[FolderSimple] = []
for data in response.json():
ret.append(parse_obj_as(FolderSimple, data))
return ret
def retrieve(self, id: str) -> Folder:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + id + "/"
token_check(self.session)
response = self._get(url)
self._raise_response_error(response)
ret = parse_obj_as(Folder, response.json())
return ret
def create(self, name: str, parent_id: str) -> str:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT
data = {"name": name, "parent_id": parent_id, "description": "", "template_id": -1}
token_check(self.session)
response = self._post(url, data=data)
self._raise_response_error(response)
ret = parse_obj_as(FolderCreateResponse, response.json())
return ret.id
def update(self, folder: FolderSimple) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT
data = {
"name": folder.name,
"description": folder.description,
}
token_check(self.session)
response = self._put(url, data=data)
self._raise_response_error(response)
return True
def destroy(self, id: str) -> bool:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + id + "/"
token_check(self.session)
response = self._delete(url)
self._raise_response_error(response)
return True
def metadata(self, id: str) -> dict:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + id + "/metadata/"
token_check(self.session)
response = self._get(url)
self._raise_response_error(response)
return response.json()

View File

@ -0,0 +1,23 @@
import sys
from typing import Final
from pydantic import parse_obj_as
from mdrsclient.api.base import BaseApi
from mdrsclient.api.utils import token_check
from mdrsclient.models import Laboratories, Laboratory
class LaboratoryApi(BaseApi):
ENTRYPOINT: Final[str] = "v2/laboratory/"
def list(self) -> Laboratories:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT
token_check(self.session)
response = self._get(url)
self._raise_response_error(response)
ret = Laboratories([])
for data in response.json():
ret.append(parse_obj_as(Laboratory, data))
return ret

44
mdrsclient/api/user.py Normal file
View File

@ -0,0 +1,44 @@
import sys
from typing import Final
import requests
from pydantic import parse_obj_as
from pydantic.dataclasses import dataclass
from mdrsclient.api.base import BaseApi
from mdrsclient.exceptions import UnauthorizedException
from mdrsclient.models import Token, User
@dataclass(frozen=True)
class UserAuthResponse(Token):
laboratory: str
lab_id: int
class UserApi(BaseApi):
ENTRYPOINT: Final[str] = "v2/"
def auth(self, username: str, password: str) -> tuple[User, Token]:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + "auth/"
data = {"username": username, "password": password}
response = self._post(url, data=data)
if response.status_code == requests.codes.unauthorized:
raise UnauthorizedException("Invalid username or password.")
self._raise_response_error(response)
obj = parse_obj_as(UserAuthResponse, response.json())
token = Token(access=obj.access, refresh=obj.refresh)
user = User(id=token.user_id, username=username, laboratory_id=obj.lab_id, laboratory=obj.laboratory)
return (user, token)
def refresh(self, token: Token) -> Token:
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
url = self.ENTRYPOINT + "refresh/"
data = {"refresh": token.refresh}
response = self._post(url, data=data)
if response.status_code == requests.codes.unauthorized:
raise UnauthorizedException("Token is invalid or expired.")
self._raise_response_error(response)
token = parse_obj_as(Token, response.json())
return token

15
mdrsclient/api/utils.py Normal file
View File

@ -0,0 +1,15 @@
from mdrsclient.api.user import UserApi
from mdrsclient.exceptions import UnauthorizedException
from mdrsclient.session import MDRSSession
def token_check(session: MDRSSession) -> None:
if session.token is not None:
if session.token.is_refresh_required:
user_api = UserApi(session)
try:
session.token = user_api.refresh(session.token)
except UnauthorizedException:
session.logout()
elif session.token.is_expired:
session.logout()