fix(token): use dedicated .lock file to guard entire refresh critical section
The previous implementation had two correctness issues:
1. flock on .tmp was ineffective for cross-process exclusion.
After fs::rename(), the .tmp inode disappears. A second process
opening .tmp gets a brand-new inode, so both processes hold flocks
on different inodes simultaneously — no mutual exclusion occurs.
2. The critical section was too narrow. The in-process tokio::Mutex
only serializes tasks within the same process. Two separate mdrs
processes could both read the cache, both decide a refresh was
needed, and both call the token-refresh endpoint before either had
written the new token back — risking double-refresh and potential
failures on servers that use refresh-token rotation.
Fix: introduce a dedicated `cache/{remote}.lock` file as the cross-
process advisory lock target. The lock file is never renamed, so its
inode remains stable for the entire critical section. The flock now
wraps the complete read-check-refresh-write cycle in
load_cache_with_token_refresh(), and the redundant flock on .tmp in
refresh_and_persist() is removed.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
+42
-21
@@ -232,32 +232,56 @@ pub fn load_cache(remote: &str) -> Result<Cache, Box<dyn std::error::Error>> {
|
||||
/// Locking strategy:
|
||||
/// - Per-remote `tokio::sync::Mutex` serializes concurrent async tasks within
|
||||
/// the same process.
|
||||
/// - `flock(LOCK_EX)` on the cache file serializes across separate processes
|
||||
/// on the same host.
|
||||
/// - `flock(LOCK_EX)` on a dedicated `cache/{remote}.lock` file serializes
|
||||
/// the entire read-check-refresh-write cycle across separate processes on
|
||||
/// the same host. The lock file is stable (never renamed), so the flock
|
||||
/// reliably protects the same inode for the lifetime of the critical section.
|
||||
pub async fn load_cache_with_token_refresh(
|
||||
remote: &str,
|
||||
) -> Result<Cache, Box<dyn std::error::Error>> {
|
||||
// Acquire the in-process async mutex for this remote
|
||||
// Acquire the in-process async mutex for this remote first.
|
||||
let lock = get_remote_lock(remote);
|
||||
let _guard = lock.lock().await;
|
||||
|
||||
// Re-read the cache inside the lock (another task may have already refreshed)
|
||||
let mut cache = load_cache(remote)?;
|
||||
// Acquire an exclusive cross-process file lock. This ensures that no
|
||||
// other process can race through the read-check-refresh-write cycle at
|
||||
// the same time as us. The lock file is separate from the cache file so
|
||||
// that it never disappears (unlike a .tmp file that gets renamed away).
|
||||
let lock_path = cache_file_path(remote).with_extension("lock");
|
||||
use fs2::FileExt;
|
||||
let lock_file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&lock_path)?;
|
||||
lock_file.lock_exclusive()?;
|
||||
|
||||
if crate::token::is_expired(&cache.token.refresh) {
|
||||
return Err(format!(
|
||||
"Session for `{}` has expired. Please run `mdrs login {}` again.",
|
||||
remote, remote
|
||||
)
|
||||
.into());
|
||||
// Re-read the cache inside the lock: another process may have already
|
||||
// refreshed the token since we last checked.
|
||||
let result: Result<Cache, Box<dyn std::error::Error>> = async {
|
||||
let mut cache = load_cache(remote)?;
|
||||
|
||||
if crate::token::is_expired(&cache.token.refresh) {
|
||||
return Err(format!(
|
||||
"Session for `{}` has expired. Please run `mdrs login {}` again.",
|
||||
remote, remote
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
|
||||
let new_access = refresh_and_persist(remote, &cache).await?;
|
||||
cache.token.access = new_access;
|
||||
}
|
||||
|
||||
Ok(cache)
|
||||
}
|
||||
.await;
|
||||
|
||||
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
|
||||
let new_access = refresh_and_persist(remote, &cache).await?;
|
||||
cache.token.access = new_access;
|
||||
}
|
||||
// Release the cross-process file lock. The OS will also release it when
|
||||
// lock_file is dropped, but we unlock explicitly for clarity.
|
||||
lock_file.unlock()?;
|
||||
|
||||
Ok(cache)
|
||||
result
|
||||
}
|
||||
|
||||
/// Call the token-refresh endpoint and write the new access token back to the
|
||||
@@ -293,21 +317,18 @@ async fn refresh_and_persist(
|
||||
obj["token"]["access"] = serde_json::Value::String(new_access.clone());
|
||||
obj["digest"] = serde_json::Value::String(new_digest);
|
||||
|
||||
// Write atomically: write to .tmp then rename, while holding an exclusive
|
||||
// flock on the .tmp file for cross-process safety.
|
||||
// Write atomically: write to .tmp then rename. The caller already holds
|
||||
// the exclusive .lock file, so no additional flock is needed here.
|
||||
let tmp_path = cache_path.with_extension("tmp");
|
||||
{
|
||||
use fs2::FileExt;
|
||||
use std::io::Write;
|
||||
let mut tmp_file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&tmp_path)?;
|
||||
tmp_file.lock_exclusive()?;
|
||||
tmp_file.write_all(serde_json::to_string(&obj)?.as_bytes())?;
|
||||
tmp_file.flush()?;
|
||||
tmp_file.unlock()?;
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user