mdrs-client-python/mdrsclient/cache.py

134 lines
3.9 KiB
Python
Raw Normal View History

2023-05-01 20:00:32 +09:00
import dataclasses
import fcntl
import hashlib
2023-05-01 20:00:32 +09:00
import json
import os
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
from pydantic.tools import parse_obj_as
from mdrsclient.exceptions import UnexpectedException
2023-05-01 20:00:32 +09:00
from mdrsclient.models import Laboratories, Token, User
from mdrsclient.settings import CONFIG_DIR_PATH
@dataclass
class CacheData:
user: User | None
token: Token | None
laboratories: Laboratories
digest: str | None
def calc_digest(self) -> str:
return hashlib.sha256(
json.dumps(
[
None if self.user is None else dataclasses.asdict(self.user),
None if self.token is None else dataclasses.asdict(self.token),
dataclasses.asdict(self.laboratories),
]
).encode("utf-8")
).hexdigest()
2023-05-01 20:00:32 +09:00
class CacheFile:
serial: int
cache_dir: str
cache_file: str
data: CacheData
def __init__(self, remote: str) -> None:
self.serial = -1
self.cache_dir = os.path.join(CONFIG_DIR_PATH, "cache")
self.cache_file = os.path.join(self.cache_dir, remote + ".json")
self.data = CacheData(user=None, token=None, laboratories=Laboratories([]), digest=None)
2023-05-01 20:00:32 +09:00
def dump(self) -> CacheData | None:
self.__load()
return self.data
@property
def token(self) -> Token | None:
self.__load()
return self.data.token
@token.setter
def token(self, token: Token) -> None:
self.__load()
self.data.token = token
self.__save()
@token.deleter
def token(self) -> None:
if self.data.token is not None:
self.__clear()
@property
def user(self) -> User | None:
return self.data.user
@user.setter
def user(self, user: User) -> None:
self.__load()
self.data.user = user
self.__save()
@user.deleter
def user(self) -> None:
if self.data.user is not None:
self.__clear()
@property
def laboratories(self) -> Laboratories:
return self.data.laboratories
@laboratories.setter
def laboratories(self, laboratories: Laboratories) -> None:
self.__load()
self.data.laboratories = laboratories
self.__save()
def __clear(self) -> None:
self.data.user = None
self.data.token = None
self.data.laboratories.clear()
self.__save()
def __load(self) -> None:
if os.path.isfile(self.cache_file):
stat = os.stat(self.cache_file)
serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
if self.serial != serial:
try:
with open(self.cache_file) as f:
data = parse_obj_as(CacheData, json.load(f))
if data.digest != data.calc_digest():
raise UnexpectedException("Cache data has been broken.")
self.data = data
except (ValidationError, UnexpectedException):
2023-05-01 20:00:32 +09:00
self.__clear()
self.__save()
else:
self.serial = serial
else:
self.__clear()
2023-05-01 20:00:32 +09:00
self.serial = -1
def __save(self) -> None:
self.__ensure_cache_dir()
with open(self.cache_file, "w") as f:
fcntl.flock(f, fcntl.LOCK_EX)
self.data.digest = self.data.calc_digest()
2023-05-01 20:00:32 +09:00
f.write(json.dumps(dataclasses.asdict(self.data)))
stat = os.stat(self.cache_file)
self.serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
2023-05-01 20:00:32 +09:00
# ensure file is secure.
os.chmod(self.cache_file, 0o600)
def __ensure_cache_dir(self) -> None:
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# ensure directory is secure.
os.chmod(self.cache_dir, 0o700)