mdrs-client-python/mdrsclient/cache.py

142 lines
4.2 KiB
Python
Raw Normal View History

2023-05-01 20:00:32 +09:00
import dataclasses
import hashlib
2023-05-01 20:00:32 +09:00
import json
import os
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
from pydantic.tools import parse_obj_as
from mdrsclient.exceptions import UnexpectedException
2023-05-01 20:00:32 +09:00
from mdrsclient.models import Laboratories, Token, User
2023-05-10 14:46:08 +09:00
from mdrsclient.settings import CONFIG_DIRNAME
from mdrsclient.utils import FileLock
2023-05-01 20:00:32 +09:00
@dataclass
class CacheData:
2023-05-10 14:46:08 +09:00
user: User | None = None
token: Token | None = None
laboratories: Laboratories = Laboratories()
digest: str = ""
2023-05-10 14:46:08 +09:00
def clear(self) -> None:
self.user = None
self.token = None
self.laboratories.clear()
self.digest = ""
def update_digest(self) -> None:
self.digest = self.__calc_digest()
def verify_digest(self) -> bool:
return self.digest == self.__calc_digest()
def __calc_digest(self) -> str:
return hashlib.sha256(
json.dumps(
[
None if self.user is None else dataclasses.asdict(self.user),
None if self.token is None else dataclasses.asdict(self.token),
dataclasses.asdict(self.laboratories),
]
).encode("utf-8")
).hexdigest()
2023-05-01 20:00:32 +09:00
class CacheFile:
2023-05-10 14:46:08 +09:00
__serial: int
__cache_dir: str
__cache_file: str
__data: CacheData
2023-05-01 20:00:32 +09:00
def __init__(self, remote: str) -> None:
2023-05-10 14:46:08 +09:00
self.__serial = -1
self.__cache_dir = os.path.join(CONFIG_DIRNAME, "cache")
self.__cache_file = os.path.join(self.__cache_dir, remote + ".json")
self.__data = CacheData()
2023-05-01 20:00:32 +09:00
@property
def token(self) -> Token | None:
self.__load()
2023-05-10 14:46:08 +09:00
return self.__data.token
2023-05-01 20:00:32 +09:00
@token.setter
def token(self, token: Token) -> None:
self.__load()
2023-05-10 14:46:08 +09:00
self.__data.token = token
2023-05-01 20:00:32 +09:00
self.__save()
@token.deleter
def token(self) -> None:
2023-05-10 14:46:08 +09:00
if self.__data.token is not None:
2023-05-01 20:00:32 +09:00
self.__clear()
@property
def user(self) -> User | None:
2023-05-10 14:46:08 +09:00
return self.__data.user
2023-05-01 20:00:32 +09:00
@user.setter
def user(self, user: User) -> None:
self.__load()
2023-05-10 14:46:08 +09:00
self.__data.user = user
2023-05-01 20:00:32 +09:00
self.__save()
@user.deleter
def user(self) -> None:
2023-05-10 14:46:08 +09:00
if self.__data.user is not None:
2023-05-01 20:00:32 +09:00
self.__clear()
@property
def laboratories(self) -> Laboratories:
2023-05-10 14:46:08 +09:00
return self.__data.laboratories
2023-05-01 20:00:32 +09:00
@laboratories.setter
def laboratories(self, laboratories: Laboratories) -> None:
self.__load()
2023-05-10 14:46:08 +09:00
self.__data.laboratories = laboratories
2023-05-01 20:00:32 +09:00
self.__save()
def __clear(self) -> None:
2023-05-10 14:46:08 +09:00
self.__data.clear()
2023-05-01 20:00:32 +09:00
self.__save()
def __load(self) -> None:
2023-05-10 14:46:08 +09:00
if os.path.isfile(self.__cache_file):
stat = os.stat(self.__cache_file)
2023-05-01 20:00:32 +09:00
serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
2023-05-10 14:46:08 +09:00
if self.__serial != serial:
2023-05-01 20:00:32 +09:00
try:
2023-05-10 14:46:08 +09:00
with open(self.__cache_file) as f:
data = parse_obj_as(CacheData, json.load(f))
2023-05-10 14:46:08 +09:00
if not data.verify_digest():
raise UnexpectedException("Cache data has been broken.")
2023-05-10 14:46:08 +09:00
self.__data = data
except (ValidationError, UnexpectedException) as e:
2023-05-01 20:00:32 +09:00
self.__clear()
self.__save()
2023-05-10 14:46:08 +09:00
print(e)
2023-05-01 20:00:32 +09:00
else:
2023-05-10 14:46:08 +09:00
self.__serial = serial
2023-05-01 20:00:32 +09:00
else:
self.__clear()
2023-05-10 14:46:08 +09:00
self.__serial = -1
2023-05-01 20:00:32 +09:00
def __save(self) -> None:
self.__ensure_cache_dir()
2023-05-10 14:46:08 +09:00
with open(self.__cache_file, "w") as f:
FileLock.lock(f)
2023-05-10 14:46:08 +09:00
self.__data.update_digest()
f.write(json.dumps(dataclasses.asdict(self.__data)))
FileLock.unlock(f)
2023-05-10 14:46:08 +09:00
stat = os.stat(self.__cache_file)
self.__serial = hash((stat.st_uid, stat.st_gid, stat.st_mode, stat.st_size, stat.st_mtime))
2023-05-01 20:00:32 +09:00
# ensure file is secure.
2023-05-10 14:46:08 +09:00
os.chmod(self.__cache_file, 0o600)
2023-05-01 20:00:32 +09:00
def __ensure_cache_dir(self) -> None:
2023-05-10 14:46:08 +09:00
if not os.path.exists(self.__cache_dir):
os.makedirs(self.__cache_dir)
2023-05-01 20:00:32 +09:00
# ensure directory is secure.
2023-05-10 14:46:08 +09:00
os.chmod(self.__cache_dir, 0o700)