45 lines
1.6 KiB
Python
45 lines
1.6 KiB
Python
|
import sys
|
||
|
from typing import Final
|
||
|
|
||
|
import requests
|
||
|
from pydantic import parse_obj_as
|
||
|
from pydantic.dataclasses import dataclass
|
||
|
|
||
|
from mdrsclient.api.base import BaseApi
|
||
|
from mdrsclient.exceptions import UnauthorizedException
|
||
|
from mdrsclient.models import Token, User
|
||
|
|
||
|
|
||
|
@dataclass(frozen=True)
|
||
|
class UserAuthResponse(Token):
|
||
|
laboratory: str
|
||
|
lab_id: int
|
||
|
|
||
|
|
||
|
class UserApi(BaseApi):
|
||
|
ENTRYPOINT: Final[str] = "v2/"
|
||
|
|
||
|
def auth(self, username: str, password: str) -> tuple[User, Token]:
|
||
|
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||
|
url = self.ENTRYPOINT + "auth/"
|
||
|
data = {"username": username, "password": password}
|
||
|
response = self._post(url, data=data)
|
||
|
if response.status_code == requests.codes.unauthorized:
|
||
|
raise UnauthorizedException("Invalid username or password.")
|
||
|
self._raise_response_error(response)
|
||
|
obj = parse_obj_as(UserAuthResponse, response.json())
|
||
|
token = Token(access=obj.access, refresh=obj.refresh)
|
||
|
user = User(id=token.user_id, username=username, laboratory_id=obj.lab_id, laboratory=obj.laboratory)
|
||
|
return (user, token)
|
||
|
|
||
|
def refresh(self, token: Token) -> Token:
|
||
|
print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||
|
url = self.ENTRYPOINT + "refresh/"
|
||
|
data = {"refresh": token.refresh}
|
||
|
response = self._post(url, data=data)
|
||
|
if response.status_code == requests.codes.unauthorized:
|
||
|
raise UnauthorizedException("Token is invalid or expired.")
|
||
|
self._raise_response_error(response)
|
||
|
token = parse_obj_as(Token, response.json())
|
||
|
return token
|