optimize class dependencies.
This commit is contained in:
parent
0281dd2ccb
commit
dd0b0ba68f
2
.env.default
Normal file
2
.env.default
Normal file
@ -0,0 +1,2 @@
|
||||
# CONFIG_DIRNAME="~/.mdrs-client"
|
||||
# CONCURRENT=10
|
@ -1,2 +0,0 @@
|
||||
# CONFIG_DIR_PATH="~/.mdrs-client"
|
||||
# NUMBER_OF_PROCESS=10
|
@ -11,7 +11,7 @@ from mdrsclient.exceptions import (
|
||||
UnauthorizedException,
|
||||
UnexpectedException,
|
||||
)
|
||||
from mdrsclient.models import DRFStandardizedErrors
|
||||
from mdrsclient.models.error import DRFStandardizedErrors
|
||||
|
||||
|
||||
class BaseApi(ABC):
|
||||
@ -20,21 +20,6 @@ class BaseApi(ABC):
|
||||
def __init__(self, connection: MDRSConnection) -> None:
|
||||
self.connection = connection
|
||||
|
||||
def _get(self, url, *args, **kwargs) -> Response:
|
||||
return self.connection.session.get(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def _post(self, url, *args, **kwargs) -> Response:
|
||||
return self.connection.session.post(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def _put(self, url, *args, **kwargs) -> Response:
|
||||
return self.connection.session.put(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def _delete(self, url, *args, **kwargs) -> Response:
|
||||
return self.connection.session.delete(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def _patch(self, url, *args, **kwargs) -> Response:
|
||||
return self.connection.session.patch(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def _raise_response_error(self, response: Response) -> None:
|
||||
if response.status_code >= 300:
|
||||
if response.status_code < 400 or response.status_code >= 500:
|
||||
@ -48,6 +33,3 @@ class BaseApi(ABC):
|
||||
raise ForbiddenException("You do not have enough permissions. Access is denied.")
|
||||
else:
|
||||
raise UnexpectedException(errors.errors[0].detail)
|
||||
|
||||
def __build_url(self, *args: tuple) -> str:
|
||||
return self.connection.build_url(*args)
|
||||
|
@ -21,7 +21,7 @@ class FileApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + id + "/"
|
||||
token_check(self.connection)
|
||||
response = self._get(url)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
return parse_obj_as(File, response.json())
|
||||
|
||||
@ -32,7 +32,7 @@ class FileApi(BaseApi):
|
||||
data = {"folder_id": folder_id}
|
||||
try:
|
||||
with open(path, mode="rb") as fp:
|
||||
response = self._post(url, data=data, files={"file": fp})
|
||||
response = self.connection.post(url, data=data, files={"file": fp})
|
||||
self._raise_response_error(response)
|
||||
ret = parse_obj_as(FileCreateResponse, response.json())
|
||||
except OSError:
|
||||
@ -47,13 +47,13 @@ class FileApi(BaseApi):
|
||||
# update file body
|
||||
try:
|
||||
with open(path, mode="rb") as fp:
|
||||
response = self._put(url, files={"file": fp})
|
||||
response = self.connection.put(url, files={"file": fp})
|
||||
except OSError:
|
||||
raise UnexpectedException(f"Could not open `{path}` file.")
|
||||
else:
|
||||
# update metadata
|
||||
data = {"name": file.name, "description": file.description}
|
||||
response = self._put(url, data=data)
|
||||
response = self.connection.put(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -61,7 +61,7 @@ class FileApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/"
|
||||
token_check(self.connection)
|
||||
response = self._delete(url)
|
||||
response = self.connection.delete(url)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -70,7 +70,7 @@ class FileApi(BaseApi):
|
||||
url = self.ENTRYPOINT + file.id + "/move/"
|
||||
data = {"folder": folder_id}
|
||||
token_check(self.connection)
|
||||
response = self._post(url, data=data)
|
||||
response = self.connection.post(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -78,14 +78,14 @@ class FileApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + file.id + "/metadata/"
|
||||
token_check(self.connection)
|
||||
response = self._get(url)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
return response.json()
|
||||
|
||||
def download(self, file: File, path: str) -> bool:
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = "v2/" + file.download_url
|
||||
response = self._get(url, stream=True)
|
||||
response = self.connection.get(url, stream=True)
|
||||
self._raise_response_error(response)
|
||||
try:
|
||||
with open(path, "wb") as f:
|
||||
|
@ -21,7 +21,7 @@ class FolderApi(BaseApi):
|
||||
url = self.ENTRYPOINT
|
||||
params = {"path": path, "laboratory_id": laboratory_id}
|
||||
token_check(self.connection)
|
||||
response = self._get(url, params=params)
|
||||
response = self.connection.get(url, params=params)
|
||||
self._raise_response_error(response)
|
||||
ret: list[FolderSimple] = []
|
||||
for data in response.json():
|
||||
@ -32,7 +32,7 @@ class FolderApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + id + "/"
|
||||
token_check(self.connection)
|
||||
response = self._get(url)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
ret = parse_obj_as(Folder, response.json())
|
||||
return ret
|
||||
@ -42,7 +42,7 @@ class FolderApi(BaseApi):
|
||||
url = self.ENTRYPOINT
|
||||
data = {"name": name, "parent_id": parent_id, "description": "", "template_id": -1}
|
||||
token_check(self.connection)
|
||||
response = self._post(url, data=data)
|
||||
response = self.connection.post(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
ret = parse_obj_as(FolderCreateResponse, response.json())
|
||||
return ret.id
|
||||
@ -55,7 +55,7 @@ class FolderApi(BaseApi):
|
||||
"description": folder.description,
|
||||
}
|
||||
token_check(self.connection)
|
||||
response = self._put(url, data=data)
|
||||
response = self.connection.put(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -63,7 +63,7 @@ class FolderApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + id + "/"
|
||||
token_check(self.connection)
|
||||
response = self._delete(url)
|
||||
response = self.connection.delete(url)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -72,7 +72,7 @@ class FolderApi(BaseApi):
|
||||
url = self.ENTRYPOINT + folder.id + "/move/"
|
||||
data = {"parent": folder_id}
|
||||
token_check(self.connection)
|
||||
response = self._post(url, data=data)
|
||||
response = self.connection.post(url, data=data)
|
||||
self._raise_response_error(response)
|
||||
return True
|
||||
|
||||
@ -80,6 +80,6 @@ class FolderApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + id + "/metadata/"
|
||||
token_check(self.connection)
|
||||
response = self._get(url)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
return response.json()
|
||||
|
@ -14,9 +14,9 @@ class LaboratoryApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT
|
||||
token_check(self.connection)
|
||||
response = self._get(url)
|
||||
response = self.connection.get(url)
|
||||
self._raise_response_error(response)
|
||||
ret = Laboratories([])
|
||||
ret = Laboratories()
|
||||
for data in response.json():
|
||||
ret.append(parse_obj_as(Laboratory, data))
|
||||
return ret
|
||||
|
@ -22,7 +22,7 @@ class UserApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + "auth/"
|
||||
data = {"username": username, "password": password}
|
||||
response = self._post(url, data=data)
|
||||
response = self.connection.post(url, data=data)
|
||||
if response.status_code == requests.codes.unauthorized:
|
||||
raise UnauthorizedException("Invalid username or password.")
|
||||
self._raise_response_error(response)
|
||||
@ -35,7 +35,7 @@ class UserApi(BaseApi):
|
||||
# print(self.__class__.__name__ + "::" + sys._getframe().f_code.co_name)
|
||||
url = self.ENTRYPOINT + "refresh/"
|
||||
data = {"refresh": token.refresh}
|
||||
response = self._post(url, data=data)
|
||||
response = self.connection.post(url, data=data)
|
||||
if response.status_code == requests.codes.unauthorized:
|
||||
raise UnauthorizedException("Token is invalid or expired.")
|
||||
self._raise_response_error(response)
|
||||
|
@ -10,17 +10,29 @@ from pydantic.tools import parse_obj_as
|
||||
|
||||
from mdrsclient.exceptions import UnexpectedException
|
||||
from mdrsclient.models import Laboratories, Token, User
|
||||
from mdrsclient.settings import CONFIG_DIR_PATH
|
||||
from mdrsclient.settings import CONFIG_DIRNAME
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheData:
|
||||
user: User | None
|
||||
token: Token | None
|
||||
laboratories: Laboratories
|
||||
digest: str | None
|
||||
user: User | None = None
|
||||
token: Token | None = None
|
||||
laboratories: Laboratories = Laboratories()
|
||||
digest: str = ""
|
||||
|
||||
def calc_digest(self) -> str:
|
||||
def clear(self) -> None:
|
||||
self.user = None
|
||||
self.token = None
|
||||
self.laboratories.clear()
|
||||
self.digest = ""
|
||||
|
||||
def update_digest(self) -> None:
|
||||
self.digest = self.__calc_digest()
|
||||
|
||||
def verify_digest(self) -> bool:
|
||||
return self.digest == self.__calc_digest()
|
||||
|
||||
def __calc_digest(self) -> str:
|
||||
return hashlib.sha256(
|
||||
json.dumps(
|
||||
[
|
||||
@ -33,101 +45,96 @@ class CacheData:
|
||||
|
||||
|
||||
class CacheFile:
|
||||
serial: int
|
||||
cache_dir: str
|
||||
cache_file: str
|
||||
data: CacheData
|
||||
__serial: int
|
||||
__cache_dir: str
|
||||
__cache_file: str
|
||||
__data: CacheData
|
||||
|
||||
def __init__(self, remote: str) -> None:
|
||||
self.serial = -1
|
||||
self.cache_dir = os.path.join(CONFIG_DIR_PATH, "cache")
|
||||
self.cache_file = os.path.join(self.cache_dir, remote + ".json")
|
||||
self.data = CacheData(user=None, token=None, laboratories=Laboratories([]), digest=None)
|
||||
|
||||
def dump(self) -> CacheData | None:
|
||||
self.__load()
|
||||
return self.data
|
||||
self.__serial = -1
|
||||
self.__cache_dir = os.path.join(CONFIG_DIRNAME, "cache")
|
||||
self.__cache_file = os.path.join(self.__cache_dir, remote + ".json")
|
||||
self.__data = CacheData()
|
||||
|
||||
@property
|
||||
def token(self) -> Token | None:
|
||||
self.__load()
|
||||
return self.data.token
|
||||
return self.__data.token
|
||||
|
||||
@token.setter
|
||||
def token(self, token: Token) -> None:
|
||||
self.__load()
|
||||
self.data.token = token
|
||||
self.__data.token = token
|
||||
self.__save()
|
||||
|
||||
@token.deleter
|
||||
def token(self) -> None:
|
||||
if self.data.token is not None:
|
||||
if self.__data.token is not None:
|
||||
self.__clear()
|
||||
|
||||
@property
|
||||
def user(self) -> User | None:
|
||||
return self.data.user
|
||||
return self.__data.user
|
||||
|
||||
@user.setter
|
||||
def user(self, user: User) -> None:
|
||||
self.__load()
|
||||
self.data.user = user
|
||||
self.__data.user = user
|
||||
self.__save()
|
||||
|
||||
@user.deleter
|
||||
def user(self) -> None:
|
||||
if self.data.user is not None:
|
||||
if self.__data.user is not None:
|
||||
self.__clear()
|
||||
|
||||
@property
|
||||
def laboratories(self) -> Laboratories:
|
||||
return self.data.laboratories
|
||||
return self.__data.laboratories
|
||||
|
||||
@laboratories.setter
|
||||
def laboratories(self, laboratories: Laboratories) -> None:
|
||||
self.__load()
|
||||
self.data.laboratories = laboratories
|
||||
self.__data.laboratories = laboratories
|
||||
self.__save()
|
||||
|
||||
def __clear(self) -> None:
|
||||
self.data.user = None
|
||||
self.data.token = None
|
||||
self.data.laboratories.clear()
|
||||
self.__data.clear()
|
||||
self.__save()
|
||||
|
||||
def __load(self) -> None:
|
||||
if os.path.isfile(self.cache_file):
|
||||
stat = os.stat(self.cache_file)
|
||||
if os.path.isfile(self.__cache_file):
|
||||
stat = os.stat(self.__cache_file)
|
||||
serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
|
||||
if self.serial != serial:
|
||||
if self.__serial != serial:
|
||||
try:
|
||||
with open(self.cache_file) as f:
|
||||
with open(self.__cache_file) as f:
|
||||
data = parse_obj_as(CacheData, json.load(f))
|
||||
if data.digest != data.calc_digest():
|
||||
if not data.verify_digest():
|
||||
raise UnexpectedException("Cache data has been broken.")
|
||||
self.data = data
|
||||
except (ValidationError, UnexpectedException):
|
||||
self.__data = data
|
||||
except (ValidationError, UnexpectedException) as e:
|
||||
self.__clear()
|
||||
self.__save()
|
||||
print(e)
|
||||
else:
|
||||
self.serial = serial
|
||||
self.__serial = serial
|
||||
else:
|
||||
self.__clear()
|
||||
self.serial = -1
|
||||
self.__serial = -1
|
||||
|
||||
def __save(self) -> None:
|
||||
self.__ensure_cache_dir()
|
||||
with open(self.cache_file, "w") as f:
|
||||
with open(self.__cache_file, "w") as f:
|
||||
fcntl.flock(f, fcntl.LOCK_EX)
|
||||
self.data.digest = self.data.calc_digest()
|
||||
f.write(json.dumps(dataclasses.asdict(self.data)))
|
||||
stat = os.stat(self.cache_file)
|
||||
self.serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
|
||||
self.__data.update_digest()
|
||||
f.write(json.dumps(dataclasses.asdict(self.__data)))
|
||||
stat = os.stat(self.__cache_file)
|
||||
self.__serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
|
||||
# ensure file is secure.
|
||||
os.chmod(self.cache_file, 0o600)
|
||||
os.chmod(self.__cache_file, 0o600)
|
||||
|
||||
def __ensure_cache_dir(self) -> None:
|
||||
if not os.path.exists(self.cache_dir):
|
||||
os.makedirs(self.cache_dir)
|
||||
if not os.path.exists(self.__cache_dir):
|
||||
os.makedirs(self.__cache_dir)
|
||||
# ensure directory is secure.
|
||||
os.chmod(self.cache_dir, 0o700)
|
||||
os.chmod(self.__cache_dir, 0o700)
|
||||
|
@ -5,30 +5,30 @@ from typing import Final
|
||||
import validators
|
||||
|
||||
from mdrsclient.exceptions import IllegalArgumentException
|
||||
from mdrsclient.settings import CONFIG_FILE_PATH
|
||||
from mdrsclient.settings import CONFIG_DIRNAME
|
||||
|
||||
|
||||
class ConfigFile:
|
||||
OPTION_URL: Final[str] = "url"
|
||||
|
||||
serial: int
|
||||
config_file: str
|
||||
config_dir: str
|
||||
CONFIG_FILENAME: Final[str] = "config.ini"
|
||||
remote: str
|
||||
config: configparser.ConfigParser
|
||||
__serial: int
|
||||
__config_dirname: str
|
||||
__config_path: str
|
||||
__config: configparser.ConfigParser
|
||||
|
||||
def __init__(self, remote: str) -> None:
|
||||
self.serial = -1
|
||||
self.config_file = CONFIG_FILE_PATH
|
||||
self.config_dir = os.path.dirname(CONFIG_FILE_PATH)
|
||||
self.remote = remote
|
||||
self.config = configparser.ConfigParser()
|
||||
self.__serial = -1
|
||||
self.__config_dirname = CONFIG_DIRNAME
|
||||
self.__config_path = os.path.join(CONFIG_DIRNAME, self.CONFIG_FILENAME)
|
||||
self.__config = configparser.ConfigParser()
|
||||
|
||||
def list(self) -> list[tuple[str, str]]:
|
||||
ret: list[tuple[str, str]] = []
|
||||
self.__load()
|
||||
for remote in self.config.sections():
|
||||
url = self.config.get(remote, self.OPTION_URL)
|
||||
for remote in self.__config.sections():
|
||||
url = self.__config.get(remote, self.OPTION_URL)
|
||||
ret.append((remote, url))
|
||||
return ret
|
||||
|
||||
@ -36,45 +36,45 @@ class ConfigFile:
|
||||
def url(self) -> str | None:
|
||||
if not self.__exists(self.remote):
|
||||
return None
|
||||
return self.config.get(self.remote, self.OPTION_URL)
|
||||
return self.__config.get(self.remote, self.OPTION_URL)
|
||||
|
||||
@url.setter
|
||||
def url(self, url: str) -> None:
|
||||
if not validators.url(url): # type: ignore
|
||||
raise IllegalArgumentException("malformed URI sequence")
|
||||
self.__load()
|
||||
if self.config.has_section(self.remote):
|
||||
self.config.remove_section(self.remote)
|
||||
self.config.add_section(self.remote)
|
||||
self.config.set(self.remote, self.OPTION_URL, url)
|
||||
if self.__config.has_section(self.remote):
|
||||
self.__config.remove_section(self.remote)
|
||||
self.__config.add_section(self.remote)
|
||||
self.__config.set(self.remote, self.OPTION_URL, url)
|
||||
self.__save()
|
||||
|
||||
@url.deleter
|
||||
def url(self) -> None:
|
||||
if self.__exists(self.remote):
|
||||
self.config.remove_section(self.remote)
|
||||
self.__config.remove_section(self.remote)
|
||||
self.__save()
|
||||
|
||||
def __exists(self, section: str) -> bool:
|
||||
self.__load()
|
||||
return self.config.has_option(section, self.OPTION_URL)
|
||||
return self.__config.has_option(section, self.OPTION_URL)
|
||||
|
||||
def __load(self) -> None:
|
||||
if os.path.isfile(self.config_file):
|
||||
stat = os.stat(self.config_file)
|
||||
if os.path.isfile(self.__config_path):
|
||||
stat = os.stat(self.__config_path)
|
||||
serial = hash(stat)
|
||||
if self.serial != serial:
|
||||
self.config.read(self.config_file, encoding="utf8")
|
||||
self.serial = serial
|
||||
if self.__serial != serial:
|
||||
self.__config.read(self.__config_path, encoding="utf8")
|
||||
self.__serial = serial
|
||||
|
||||
def __save(self) -> None:
|
||||
self.__ensure_cache_dir()
|
||||
with open(self.config_file, "w") as f:
|
||||
self.config.write(f)
|
||||
os.chmod(self.config_file, 0o600)
|
||||
with open(self.__config_path, "w") as f:
|
||||
self.__config.write(f)
|
||||
os.chmod(self.__config_path, 0o600)
|
||||
|
||||
def __ensure_cache_dir(self) -> None:
|
||||
if not os.path.exists(self.config_dir):
|
||||
os.makedirs(self.config_dir)
|
||||
if not os.path.exists(self.__config_dirname):
|
||||
os.makedirs(self.__config_dirname)
|
||||
# ensure directory is secure.
|
||||
os.chmod(self.config_dir, 0o700)
|
||||
os.chmod(self.__config_dirname, 0o700)
|
||||
|
@ -1,6 +1,6 @@
|
||||
import threading
|
||||
|
||||
import requests
|
||||
from requests import Response, Session
|
||||
|
||||
from mdrsclient.cache import CacheFile
|
||||
from mdrsclient.exceptions import MissingConfigurationException
|
||||
@ -9,24 +9,32 @@ from mdrsclient.models import Laboratories, Token, User
|
||||
|
||||
class MDRSConnection:
|
||||
url: str
|
||||
session: requests.Session
|
||||
session: Session
|
||||
lock: threading.Lock
|
||||
__cache: CacheFile
|
||||
|
||||
def __init__(self, remote: str, url: str) -> None:
|
||||
super().__init__()
|
||||
self.url = url
|
||||
self.session = requests.Session()
|
||||
self.session = Session()
|
||||
self.lock = threading.Lock()
|
||||
self.__cache = CacheFile(remote)
|
||||
self.__prepare_headers()
|
||||
|
||||
def build_url(self, *args) -> str:
|
||||
if self.url == "":
|
||||
raise MissingConfigurationException("remote host is not configured")
|
||||
parts = [self.url]
|
||||
parts.extend(args)
|
||||
return "/".join(parts)
|
||||
def get(self, url, *args, **kwargs) -> Response:
|
||||
return self.session.get(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def post(self, url, *args, **kwargs) -> Response:
|
||||
return self.session.post(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def put(self, url, *args, **kwargs) -> Response:
|
||||
return self.session.put(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def delete(self, url, *args, **kwargs) -> Response:
|
||||
return self.session.delete(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def patch(self, url, *args, **kwargs) -> Response:
|
||||
return self.session.patch(self.__build_url(url), *args, **kwargs)
|
||||
|
||||
def logout(self) -> None:
|
||||
del self.__cache.user
|
||||
@ -58,6 +66,13 @@ class MDRSConnection:
|
||||
def laboratories(self, laboratories: Laboratories) -> None:
|
||||
self.__cache.laboratories = laboratories
|
||||
|
||||
def __build_url(self, *args: str) -> str:
|
||||
if self.url == "":
|
||||
raise MissingConfigurationException("remote host is not configured")
|
||||
parts = [self.url]
|
||||
parts.extend(args)
|
||||
return "/".join(parts)
|
||||
|
||||
def __prepare_headers(self) -> None:
|
||||
self.session.headers.update({"accept": "application/json"})
|
||||
if self.token is not None:
|
||||
|
@ -1,3 +1,4 @@
|
||||
from dataclasses import field
|
||||
from typing import Generator
|
||||
|
||||
from pydantic.dataclasses import dataclass
|
||||
@ -13,7 +14,7 @@ class Laboratory:
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Laboratories:
|
||||
items: list[Laboratory]
|
||||
items: list[Laboratory] = field(default_factory=list)
|
||||
|
||||
def __iter__(self) -> Generator[Laboratory, None, None]:
|
||||
yield from self.items
|
||||
|
@ -4,8 +4,8 @@ from pydantic import BaseSettings
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
config_dir_path: str = "~/.mdrs-client"
|
||||
number_of_process: int = 10
|
||||
config_dirname: str = "~/.mdrs-client"
|
||||
concurrent: int = 10
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
@ -14,7 +14,5 @@ class Settings(BaseSettings):
|
||||
|
||||
settings = Settings()
|
||||
|
||||
NUMBER_OF_PROCESS = settings.number_of_process
|
||||
|
||||
CONFIG_DIR_PATH = os.path.expanduser(settings.config_dir_path)
|
||||
CONFIG_FILE_PATH = os.path.join(CONFIG_DIR_PATH, "config.ini")
|
||||
CONCURRENT = settings.concurrent
|
||||
CONFIG_DIRNAME = os.path.abspath(os.path.expanduser(settings.config_dirname))
|
||||
|
Loading…
x
Reference in New Issue
Block a user