import dataclasses import fcntl import hashlib import json import os from pydantic import ValidationError from pydantic.dataclasses import dataclass from pydantic.tools import parse_obj_as from mdrsclient.exceptions import UnexpectedException from mdrsclient.models import Laboratories, Token, User from mdrsclient.settings import CONFIG_DIR_PATH @dataclass class CacheData: user: User | None token: Token | None laboratories: Laboratories digest: str | None def calc_digest(self) -> str: return hashlib.sha256( json.dumps( [ None if self.user is None else dataclasses.asdict(self.user), None if self.token is None else dataclasses.asdict(self.token), dataclasses.asdict(self.laboratories), ] ).encode("utf-8") ).hexdigest() class CacheFile: serial: int cache_dir: str cache_file: str data: CacheData def __init__(self, remote: str) -> None: self.serial = -1 self.cache_dir = os.path.join(CONFIG_DIR_PATH, "cache") self.cache_file = os.path.join(self.cache_dir, remote + ".json") self.data = CacheData(user=None, token=None, laboratories=Laboratories([]), digest=None) def dump(self) -> CacheData | None: self.__load() return self.data @property def token(self) -> Token | None: self.__load() return self.data.token @token.setter def token(self, token: Token) -> None: self.__load() self.data.token = token self.__save() @token.deleter def token(self) -> None: if self.data.token is not None: self.__clear() @property def user(self) -> User | None: return self.data.user @user.setter def user(self, user: User) -> None: self.__load() self.data.user = user self.__save() @user.deleter def user(self) -> None: if self.data.user is not None: self.__clear() @property def laboratories(self) -> Laboratories: return self.data.laboratories @laboratories.setter def laboratories(self, laboratories: Laboratories) -> None: self.__load() self.data.laboratories = laboratories self.__save() def __clear(self) -> None: self.data.user = None self.data.token = None self.data.laboratories.clear() self.__save() def __load(self) -> None: if os.path.isfile(self.cache_file): stat = os.stat(self.cache_file) serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime)) if self.serial != serial: try: with open(self.cache_file) as f: data = parse_obj_as(CacheData, json.load(f)) print(f"{data.digest} : {data.calc_digest()}") if data.digest != data.calc_digest(): raise UnexpectedException("Cache data has been broken.") self.data = data except (ValidationError, UnexpectedException): self.__clear() self.__save() else: self.serial = serial else: self.__clear() self.serial = -1 def __save(self) -> None: self.__ensure_cache_dir() with open(self.cache_file, "w") as f: fcntl.flock(f, fcntl.LOCK_EX) self.data.digest = self.data.calc_digest() f.write(json.dumps(dataclasses.asdict(self.data))) stat = os.stat(self.cache_file) self.serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime)) # ensure file is secure. os.chmod(self.cache_file, 0o600) def __ensure_cache_dir(self) -> None: if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) # ensure directory is secure. os.chmod(self.cache_dir, 0o700)