mdrs-client-python/mdrsclient/connection.py

80 lines
2.4 KiB
Python

import threading
from requests import Response, Session
from mdrsclient.cache import CacheFile
from mdrsclient.exceptions import MissingConfigurationException
from mdrsclient.models import Laboratories, Token, User
class MDRSConnection:
url: str
session: Session
lock: threading.Lock
__cache: CacheFile
def __init__(self, remote: str, url: str) -> None:
super().__init__()
self.url = url
self.session = Session()
self.lock = threading.Lock()
self.__cache = CacheFile(remote)
self.__prepare_headers()
def get(self, url, *args, **kwargs) -> Response:
return self.session.get(self.__build_url(url), *args, **kwargs)
def post(self, url, *args, **kwargs) -> Response:
return self.session.post(self.__build_url(url), *args, **kwargs)
def put(self, url, *args, **kwargs) -> Response:
return self.session.put(self.__build_url(url), *args, **kwargs)
def delete(self, url, *args, **kwargs) -> Response:
return self.session.delete(self.__build_url(url), *args, **kwargs)
def patch(self, url, *args, **kwargs) -> Response:
return self.session.patch(self.__build_url(url), *args, **kwargs)
def logout(self) -> None:
del self.__cache.user
del self.__cache.token
self.session.headers.update({"Authorization": ""})
@property
def user(self) -> User | None:
return self.__cache.user
@user.setter
def user(self, user: User) -> None:
self.__cache.user = user
@property
def token(self) -> Token | None:
return self.__cache.token
@token.setter
def token(self, token: Token) -> None:
self.__cache.token = token
self.__prepare_headers()
@property
def laboratories(self) -> Laboratories:
return self.__cache.laboratories
@laboratories.setter
def laboratories(self, laboratories: Laboratories) -> None:
self.__cache.laboratories = laboratories
def __build_url(self, *args: str) -> str:
if self.url == "":
raise MissingConfigurationException("remote host is not configured")
parts = [self.url]
parts.extend(args)
return "/".join(parts)
def __prepare_headers(self) -> None:
self.session.headers.update({"accept": "application/json"})
if self.token is not None:
self.session.headers.update({"Authorization": f"Bearer {self.token.access}"})