Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
459dd1cd7c
|
|||
|
9d29aad463
|
|||
|
d25ab69d13
|
|||
|
14991b18fb
|
Generated
+1
-1
@@ -1268,7 +1268,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mdrs-client-rust"
|
||||
version = "0.1.1"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mdrs-client-rust"
|
||||
version = "0.1.1"
|
||||
version = "2.0.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
authors = ["Neuroinformatics Unit, RIKEN CBS"]
|
||||
|
||||
+48
-5
@@ -1,4 +1,4 @@
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
pub use crate::models::file::File;
|
||||
use anyhow::bail;
|
||||
use unicode_normalization::UnicodeNormalization;
|
||||
@@ -34,8 +34,43 @@ impl MDRSConnection {
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
/// Upload a local file into the given remote folder.
|
||||
pub async fn upload_file(&self, folder_id: &str, file_path: &str) -> Result<(), anyhow::Error> {
|
||||
/// List all files in a folder while consuming the shared API concurrency budget.
|
||||
pub async fn list_all_files_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<Vec<File>, anyhow::Error> {
|
||||
let mut all_files = Vec::new();
|
||||
let mut page: u32 = 1;
|
||||
loop {
|
||||
let params = [
|
||||
("folder_id", folder_id.to_string()),
|
||||
("page", page.to_string()),
|
||||
];
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_with_query("v3/files/", ¶ms).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("List files failed: {}", resp.status());
|
||||
}
|
||||
let list: FileListResponse = resp.json().await?;
|
||||
let has_next = list.next.is_some();
|
||||
all_files.extend(list.results);
|
||||
if !has_next {
|
||||
break;
|
||||
}
|
||||
page += 1;
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
/// Upload a local file into the given remote folder while consuming the
|
||||
/// shared API concurrency budget.
|
||||
pub async fn upload_file_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
file_path: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
use anyhow::{anyhow, bail};
|
||||
use reqwest::multipart;
|
||||
let file_name: String = std::path::Path::new(file_path)
|
||||
@@ -49,6 +84,7 @@ impl MDRSConnection {
|
||||
let form = multipart::Form::new()
|
||||
.text("folder_id", folder_id.to_string())
|
||||
.part("file", part);
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.post_multipart("v3/files/", form).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Upload failed: {}", resp.status());
|
||||
@@ -56,13 +92,20 @@ impl MDRSConnection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Download a file from `url` and write it to `dest`.
|
||||
pub async fn download_file(&self, url: &str, dest: &str) -> Result<(), anyhow::Error> {
|
||||
/// Download a file while consuming the shared API concurrency budget.
|
||||
pub async fn download_file_limited(
|
||||
&self,
|
||||
url: &str,
|
||||
dest: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_url(url).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Download failed: {}", resp.status());
|
||||
}
|
||||
let bytes = resp.bytes().await?;
|
||||
drop(_permit);
|
||||
tokio::fs::write(dest, &bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+85
-2
@@ -1,6 +1,6 @@
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
pub use crate::models::folder::{FolderDetail, FolderSimple};
|
||||
use anyhow::bail;
|
||||
use anyhow::{anyhow, bail};
|
||||
|
||||
impl MDRSConnection {
|
||||
/// List folders matching the given path under a laboratory (GET v3/folders/?path=...&laboratory_id=...)
|
||||
@@ -20,6 +20,25 @@ impl MDRSConnection {
|
||||
Ok(resp.json::<Vec<FolderSimple>>().await?)
|
||||
}
|
||||
|
||||
/// List folders by path while consuming the shared API concurrency budget.
|
||||
pub async fn list_folders_by_path_limited(
|
||||
&self,
|
||||
lab_id: u32,
|
||||
path: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<Vec<FolderSimple>, anyhow::Error> {
|
||||
let params = [
|
||||
("laboratory_id", lab_id.to_string()),
|
||||
("path", path.to_string()),
|
||||
];
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_with_query("v3/folders/", ¶ms).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("List folders failed: {}", resp.status());
|
||||
}
|
||||
Ok(resp.json::<Vec<FolderSimple>>().await?)
|
||||
}
|
||||
|
||||
/// Retrieve full folder details including sub_folders (GET v3/folders/{id}/)
|
||||
pub async fn retrieve_folder(&self, id: &str) -> Result<FolderDetail, anyhow::Error> {
|
||||
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
|
||||
@@ -29,6 +48,20 @@ impl MDRSConnection {
|
||||
Ok(resp.json::<FolderDetail>().await?)
|
||||
}
|
||||
|
||||
/// Retrieve folder details while consuming the shared API concurrency budget.
|
||||
pub async fn retrieve_folder_limited(
|
||||
&self,
|
||||
id: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<FolderDetail, anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Retrieve folder failed: {}", resp.status());
|
||||
}
|
||||
Ok(resp.json::<FolderDetail>().await?)
|
||||
}
|
||||
|
||||
/// Create a new folder under `parent_id` (POST v3/folders/).
|
||||
pub async fn create_folder(
|
||||
&self,
|
||||
@@ -44,6 +77,32 @@ impl MDRSConnection {
|
||||
self.post_json("v3/folders/", &body).await
|
||||
}
|
||||
|
||||
/// Create a new folder under `parent_id` and return its ID while consuming
|
||||
/// the shared API concurrency budget.
|
||||
pub async fn create_folder_id_limited(
|
||||
&self,
|
||||
parent_id: &str,
|
||||
folder_name: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
let body = serde_json::json!({
|
||||
"name": folder_name,
|
||||
"parent_id": parent_id,
|
||||
"description": "",
|
||||
"template_id": -1,
|
||||
});
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.post_json("v3/folders/", &body).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Failed to create remote folder: {}", folder_name);
|
||||
}
|
||||
let json: serde_json::Value = resp.json().await?;
|
||||
json["id"]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow!("No id in create_folder response for {}", folder_name))
|
||||
.map(|s| s.to_string())
|
||||
}
|
||||
|
||||
/// Authenticate against a password-locked folder (POST v3/folders/{id}/auth/).
|
||||
/// Returns `Err` if the password is incorrect or the request fails.
|
||||
pub async fn folder_auth(&self, folder_id: &str, password: &str) -> Result<(), anyhow::Error> {
|
||||
@@ -61,4 +120,28 @@ impl MDRSConnection {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Authenticate against a locked folder while consuming the shared API
|
||||
/// concurrency budget.
|
||||
pub async fn folder_auth_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
password: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self
|
||||
.post_json(
|
||||
&format!("v3/folders/{}/auth/", folder_id),
|
||||
&serde_json::json!({"password": password}),
|
||||
)
|
||||
.await?;
|
||||
if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||
bail!("Password is incorrect.");
|
||||
}
|
||||
if !resp.status().is_success() {
|
||||
bail!("Folder auth failed: {}", resp.status());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
+326
-59
@@ -2,13 +2,17 @@ pub mod digest;
|
||||
pub mod types;
|
||||
|
||||
pub use digest::compute_digest;
|
||||
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheUser};
|
||||
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheToken, CacheUser};
|
||||
|
||||
use crate::connection::MDRSConnection;
|
||||
use anyhow::{anyhow, bail};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Per-remote async mutex map (in-process serialization)
|
||||
@@ -17,6 +21,29 @@ use std::sync::{Arc, LazyLock, Mutex};
|
||||
static REMOTE_LOCKS: LazyLock<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
|
||||
LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
struct CacheStoreKey {
|
||||
config_dir: PathBuf,
|
||||
remote: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
struct CacheFileSnapshot {
|
||||
len: u64,
|
||||
modified_nanos: u128,
|
||||
#[cfg(unix)]
|
||||
inode: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MemoryCacheEntry {
|
||||
snapshot: CacheFileSnapshot,
|
||||
cache: Cache,
|
||||
}
|
||||
|
||||
static MEMORY_CACHE: LazyLock<Mutex<HashMap<CacheStoreKey, MemoryCacheEntry>>> =
|
||||
LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
|
||||
let mut map = REMOTE_LOCKS.lock().unwrap();
|
||||
map.entry(remote.to_string())
|
||||
@@ -28,11 +55,161 @@ fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
|
||||
// Cache file path helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn cache_file_path(remote: &str) -> std::path::PathBuf {
|
||||
crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote))
|
||||
fn cache_store_key(config_dir: &Path, remote: &str) -> CacheStoreKey {
|
||||
CacheStoreKey {
|
||||
config_dir: config_dir.to_path_buf(),
|
||||
remote: remote.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn cache_dir_path(config_dir: &Path) -> PathBuf {
|
||||
config_dir.join("cache")
|
||||
}
|
||||
|
||||
fn cache_file_path_in(config_dir: &Path, remote: &str) -> PathBuf {
|
||||
cache_dir_path(config_dir).join(format!("{}.json", remote))
|
||||
}
|
||||
|
||||
fn cache_file_path(remote: &str) -> PathBuf {
|
||||
cache_file_path_in(&crate::settings::SETTINGS.config_dirname, remote)
|
||||
}
|
||||
|
||||
fn cache_snapshot(metadata: &fs::Metadata) -> CacheFileSnapshot {
|
||||
let modified_nanos = metadata
|
||||
.modified()
|
||||
.ok()
|
||||
.and_then(|time| time.duration_since(UNIX_EPOCH).ok())
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or_default();
|
||||
|
||||
CacheFileSnapshot {
|
||||
len: metadata.len(),
|
||||
modified_nanos,
|
||||
#[cfg(unix)]
|
||||
inode: metadata.ino(),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_cache_snapshot(cache_path: &Path) -> Result<CacheFileSnapshot, std::io::Error> {
|
||||
fs::metadata(cache_path).map(|metadata| cache_snapshot(&metadata))
|
||||
}
|
||||
|
||||
fn cached_entry(config_dir: &Path, remote: &str, snapshot: &CacheFileSnapshot) -> Option<Cache> {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let map = MEMORY_CACHE.lock().unwrap();
|
||||
map.get(&key)
|
||||
.filter(|entry| entry.snapshot == *snapshot)
|
||||
.map(|entry| entry.cache.clone())
|
||||
}
|
||||
|
||||
fn update_cached_entry(config_dir: &Path, remote: &str, snapshot: CacheFileSnapshot, cache: Cache) {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let mut map = MEMORY_CACHE.lock().unwrap();
|
||||
map.insert(key, MemoryCacheEntry { snapshot, cache });
|
||||
}
|
||||
|
||||
fn invalidate_cached_entry(config_dir: &Path, remote: &str) {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let mut map = MEMORY_CACHE.lock().unwrap();
|
||||
map.remove(&key);
|
||||
}
|
||||
|
||||
fn ensure_cache_dir(cache_dir: &Path) -> Result<(), anyhow::Error> {
|
||||
fs::create_dir_all(cache_dir)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(cache_dir)?.permissions();
|
||||
perms.set_mode(0o700);
|
||||
fs::set_permissions(cache_dir, perms)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_cache_file(cache_path: &Path, cache: &Cache) -> Result<(), anyhow::Error> {
|
||||
let tmp_path = cache_path.with_extension("tmp");
|
||||
fs::write(&tmp_path, serde_json::to_vec_pretty(cache)?)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&tmp_path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp_path, perms)?;
|
||||
}
|
||||
fs::rename(&tmp_path, cache_path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_cache(remote: &str, data: &str) -> Result<Cache, anyhow::Error> {
|
||||
serde_json::from_str::<Cache>(data).map_err(|e| {
|
||||
anyhow!(
|
||||
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
|
||||
remote,
|
||||
e,
|
||||
remote
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn load_cache_from_dir(remote: &str, config_dir: &Path) -> Result<Cache, anyhow::Error> {
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
let snapshot = match read_cache_snapshot(&cache_path) {
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if let Some(cache) = cached_entry(config_dir, remote, &snapshot) {
|
||||
return Ok(cache);
|
||||
}
|
||||
|
||||
let data = match fs::read_to_string(&cache_path) {
|
||||
Ok(data) => data,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let cache = parse_cache(remote, &data)?;
|
||||
update_cached_entry(config_dir, remote, snapshot, cache.clone());
|
||||
Ok(cache)
|
||||
}
|
||||
|
||||
fn persist_cache_in_dir(
|
||||
remote: &str,
|
||||
config_dir: &Path,
|
||||
cache: &Cache,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let cache_dir = cache_dir_path(config_dir);
|
||||
ensure_cache_dir(&cache_dir)?;
|
||||
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
write_cache_file(&cache_path, cache)?;
|
||||
|
||||
let snapshot = read_cache_snapshot(&cache_path)?;
|
||||
update_cached_entry(config_dir, remote, snapshot, cache.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_cache_in_dir(remote: &str, config_dir: &Path) -> Result<(), anyhow::Error> {
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
match fs::remove_file(&cache_path) {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -41,23 +218,17 @@ fn cache_file_path(remote: &str) -> std::path::PathBuf {
|
||||
|
||||
/// Load token and laboratories from the login cache file (no token refresh check).
|
||||
pub fn load_cache(remote: &str) -> Result<Cache, anyhow::Error> {
|
||||
let cache_path = cache_file_path(remote);
|
||||
if !cache_path.exists() {
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
let data = fs::read_to_string(&cache_path)?;
|
||||
serde_json::from_str::<Cache>(&data).map_err(|e| {
|
||||
anyhow!(
|
||||
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
|
||||
remote,
|
||||
e,
|
||||
remote
|
||||
)
|
||||
})
|
||||
load_cache_from_dir(remote, &crate::settings::SETTINGS.config_dirname)
|
||||
}
|
||||
|
||||
/// Persist a cache entry and refresh the in-memory fast path.
|
||||
pub fn persist_cache(remote: &str, cache: &Cache) -> Result<(), anyhow::Error> {
|
||||
persist_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname, cache)
|
||||
}
|
||||
|
||||
/// Remove a cache entry from disk and memory.
|
||||
pub fn remove_cache(remote: &str) -> Result<(), anyhow::Error> {
|
||||
remove_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -98,8 +269,7 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
|
||||
}
|
||||
|
||||
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
|
||||
let new_access = refresh_and_persist(remote, &cache).await?;
|
||||
cache.token.access = new_access;
|
||||
cache = refresh_and_persist(remote, &cache).await?;
|
||||
}
|
||||
|
||||
Ok(cache)
|
||||
@@ -112,49 +282,25 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
|
||||
|
||||
/// Call the token-refresh endpoint and write the new access token back to the
|
||||
/// cache file. The caller must already hold the per-remote async mutex.
|
||||
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<String, anyhow::Error> {
|
||||
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<Cache, anyhow::Error> {
|
||||
let url = crate::commands::config::get_remote_url(remote)?
|
||||
.ok_or_else(|| anyhow!("Remote `{}` is not configured.", remote))?;
|
||||
let conn = MDRSConnection::new(&url);
|
||||
|
||||
let new_access = conn.token_refresh(&cache.token.refresh).await?;
|
||||
|
||||
let new_digest = compute_digest(
|
||||
cache.user.as_ref(),
|
||||
&new_access,
|
||||
&cache.token.refresh,
|
||||
&cache.laboratories,
|
||||
let mut updated_cache = cache.clone();
|
||||
updated_cache.token.access = new_access;
|
||||
updated_cache.digest = compute_digest(
|
||||
updated_cache.user.as_ref(),
|
||||
&updated_cache.token.access,
|
||||
&updated_cache.token.refresh,
|
||||
&updated_cache.laboratories,
|
||||
);
|
||||
|
||||
let cache_path = cache_file_path(remote);
|
||||
let raw = fs::read_to_string(&cache_path)?;
|
||||
let mut obj: serde_json::Value = serde_json::from_str(&raw)?;
|
||||
persist_cache(remote, &updated_cache)?;
|
||||
|
||||
obj["token"]["access"] = serde_json::Value::String(new_access.clone());
|
||||
obj["digest"] = serde_json::Value::String(new_digest);
|
||||
|
||||
// Write atomically: write to .tmp then rename.
|
||||
let tmp_path = cache_path.with_extension("tmp");
|
||||
{
|
||||
use std::io::Write;
|
||||
let mut tmp_file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&tmp_path)?;
|
||||
tmp_file.write_all(serde_json::to_string(&obj)?.as_bytes())?;
|
||||
tmp_file.flush()?;
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let mut perms = fs::metadata(&tmp_path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp_path, perms)?;
|
||||
}
|
||||
fs::rename(&tmp_path, &cache_path)?;
|
||||
|
||||
Ok(new_access)
|
||||
Ok(updated_cache)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -172,3 +318,124 @@ pub fn create_authenticated_conn(
|
||||
.with_remote(remote)
|
||||
.with_token(cache.token.access.clone()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn sample_cache(username: &str) -> Cache {
|
||||
Cache {
|
||||
user: Some(CacheUser {
|
||||
id: 1,
|
||||
username: username.to_string(),
|
||||
laboratory_ids: vec![10, 20],
|
||||
is_reviewer: false,
|
||||
}),
|
||||
token: types::CacheToken {
|
||||
access: format!("access-{username}"),
|
||||
refresh: format!("refresh-{username}"),
|
||||
},
|
||||
laboratories: CacheLabsWrapper {
|
||||
items: vec![CacheLaboratory {
|
||||
id: 10,
|
||||
name: "lab".to_string(),
|
||||
pi_name: String::new(),
|
||||
full_name: "Laboratory".to_string(),
|
||||
}],
|
||||
},
|
||||
digest: format!("digest-{username}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_name(prefix: &str, config_dir: &Path) -> String {
|
||||
format!(
|
||||
"{prefix}-{}",
|
||||
config_dir
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
.replace('.', "_")
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn load_cache_uses_memory_fast_path_when_snapshot_matches() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("fast-path", dir.path());
|
||||
let cache = sample_cache("alice");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
|
||||
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(first.user.unwrap().username, "alice");
|
||||
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
|
||||
perms.set_mode(0o000);
|
||||
fs::set_permissions(&cache_path, perms).unwrap();
|
||||
|
||||
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(second.user.unwrap().username, "alice");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_cache_reloads_when_external_writer_changes_file() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("reload", dir.path());
|
||||
let original = sample_cache("alice");
|
||||
let updated = sample_cache("bob");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
|
||||
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(first.user.unwrap().username, "alice");
|
||||
|
||||
let cache_dir = cache_dir_path(dir.path());
|
||||
ensure_cache_dir(&cache_dir).unwrap();
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
write_cache_file(&cache_path, &updated).unwrap();
|
||||
|
||||
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(second.user.unwrap().username, "bob");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn persist_cache_refreshes_memory_entry() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("persist", dir.path());
|
||||
let original = sample_cache("alice");
|
||||
let updated = sample_cache("bob");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
|
||||
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &updated).unwrap();
|
||||
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
|
||||
perms.set_mode(0o000);
|
||||
fs::set_permissions(&cache_path, perms).unwrap();
|
||||
|
||||
let loaded = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(loaded.user.unwrap().username, "bob");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_cache_invalidates_memory_entry() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("remove", dir.path());
|
||||
let cache = sample_cache("alice");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
|
||||
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
remove_cache_in_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
let err = load_cache_from_dir(&remote, dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains(&format!("Not logged in to `{remote}`"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
+8
-6
@@ -1,14 +1,14 @@
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Access and refresh token pair stored in the login cache file.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheToken {
|
||||
pub access: String,
|
||||
pub refresh: String,
|
||||
}
|
||||
|
||||
/// Minimal user fields stored in the cache, matching Python's `User` dataclass.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheUser {
|
||||
pub id: u32,
|
||||
pub username: String,
|
||||
@@ -17,7 +17,7 @@ pub struct CacheUser {
|
||||
}
|
||||
|
||||
/// All four laboratory fields needed for digest computation.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheLaboratory {
|
||||
pub id: u32,
|
||||
pub name: String,
|
||||
@@ -28,15 +28,17 @@ pub struct CacheLaboratory {
|
||||
}
|
||||
|
||||
/// Wrapper matching Python's `Laboratories` serialization: `{"items": [...]}`.
|
||||
#[derive(Deserialize, Clone, Default)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct CacheLabsWrapper {
|
||||
pub items: Vec<CacheLaboratory>,
|
||||
}
|
||||
|
||||
/// Full login cache, corresponding to the `<remote>.json` file written by `login`.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Cache {
|
||||
pub user: Option<CacheUser>,
|
||||
pub token: CacheToken,
|
||||
pub laboratories: CacheLabsWrapper,
|
||||
#[serde(default)]
|
||||
pub digest: String,
|
||||
}
|
||||
|
||||
+309
-70
@@ -1,12 +1,13 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{
|
||||
find_file_by_name, find_folder, find_lab_in_cache, find_subfolder_by_name, parse_remote_path,
|
||||
find_file_by_name, find_folder_limited, find_lab_in_cache, find_subfolder_by_name,
|
||||
parse_remote_path,
|
||||
};
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
use anyhow::{anyhow, bail};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub async fn download(
|
||||
remote_path: &str,
|
||||
@@ -19,6 +20,7 @@ pub async fn download(
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
|
||||
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
|
||||
// Validate that local_path is an existing directory (matching Python's behaviour).
|
||||
@@ -40,8 +42,11 @@ pub async fn download(
|
||||
None => ("/".to_string(), r_path_clean.to_string()),
|
||||
};
|
||||
|
||||
let parent_folder = find_folder(&conn, lab.id, &parent_path, password).await?;
|
||||
let files = conn.list_all_files(&parent_folder.id).await?;
|
||||
let parent_folder =
|
||||
find_folder_limited(&conn, &limiter, lab.id, &parent_path, password).await?;
|
||||
let files = conn
|
||||
.list_all_files_limited(&parent_folder.id, &limiter)
|
||||
.await?;
|
||||
|
||||
// Case 1: basename matches a file in the parent folder.
|
||||
if let Some(file) = find_file_by_name(&files, &basename) {
|
||||
@@ -61,7 +66,8 @@ pub async fn download(
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &file.download_url);
|
||||
conn.download_file(&url, &dest.to_string_lossy()).await?;
|
||||
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}", dest.display());
|
||||
return Ok(());
|
||||
}
|
||||
@@ -76,73 +82,41 @@ pub async fn download(
|
||||
// We create that subdirectory first, then recurse into it.
|
||||
let top_local = local_real.join(&sub.name);
|
||||
|
||||
// Iterative DFS: each entry is (remote_folder_id, local_dir)
|
||||
let mut stack: Vec<(String, PathBuf)> = vec![(sub.id.clone(), top_local)];
|
||||
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
let excludes = Arc::new(excludes);
|
||||
let lab_name = Arc::new(lab.name.clone());
|
||||
let password = password.map(str::to_string);
|
||||
|
||||
while let Some((folder_id, local_dir)) = stack.pop() {
|
||||
let folder = conn.retrieve_folder(&folder_id).await?;
|
||||
spawn_download_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
sub.id.clone(),
|
||||
top_local,
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
if is_excluded(&excludes, &lab.name, &folder.path, None) {
|
||||
continue;
|
||||
}
|
||||
drive_download_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut download_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
password,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
tokio::fs::create_dir_all(&local_dir).await?;
|
||||
println!("{}", local_dir.display());
|
||||
|
||||
let dir_files = conn.list_all_files(&folder_id).await?;
|
||||
|
||||
// Download files in this folder (up to 10 concurrent).
|
||||
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
|
||||
for f in &dir_files {
|
||||
if is_excluded(&excludes, &lab.name, &folder.path, Some(&f.name)) {
|
||||
continue;
|
||||
}
|
||||
let dest_path = local_dir.join(&f.name);
|
||||
if skip_if_exists {
|
||||
if dest_path.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest_path) {
|
||||
if meta.len() == f.size {
|
||||
println!("{}", dest_path.display());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &f.download_url);
|
||||
let conn = conn.clone();
|
||||
futs.push(tokio::spawn(async move {
|
||||
let dest_str = dest_path.to_string_lossy().to_string();
|
||||
match conn.download_file(&url, &dest_str).await {
|
||||
Ok(_) => println!("{}", dest_path.display()),
|
||||
Err(_) => {
|
||||
eprintln!("Failed: {}", dest_path.display());
|
||||
if dest_path.is_file() {
|
||||
let _ = std::fs::remove_file(&dest_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
if futs.len() >= crate::settings::SETTINGS.concurrent {
|
||||
let _ = futs.next().await;
|
||||
}
|
||||
}
|
||||
while futs.next().await.is_some() {}
|
||||
|
||||
// Push sub-folders onto the stack for recursive processing.
|
||||
for sf in &folder.sub_folders {
|
||||
if sf.lock {
|
||||
match password {
|
||||
Some(pw) => {
|
||||
if conn.folder_auth(&sf.id, pw).await.is_err() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
let sub_local = local_dir.join(&sf.name);
|
||||
stack.push((sf.id.clone(), sub_local));
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
@@ -181,3 +155,268 @@ fn make_absolute_url(conn: &MDRSConnection, url: &str) -> String {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct DownloadFolderTaskResult {
|
||||
child_folders: Vec<(String, PathBuf)>,
|
||||
download_jobs: Vec<DownloadJob>,
|
||||
}
|
||||
|
||||
struct DownloadJob {
|
||||
url: String,
|
||||
dest_path: PathBuf,
|
||||
}
|
||||
|
||||
fn spawn_download_folder_task(
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
folder_id: String,
|
||||
local_dir: PathBuf,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
folder_tasks.spawn(async move {
|
||||
process_download_folder(
|
||||
conn,
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
folder_id,
|
||||
local_dir,
|
||||
password,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_download_task(
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
job: DownloadJob,
|
||||
) {
|
||||
download_tasks.spawn(async move {
|
||||
let dest_str = job.dest_path.to_string_lossy().to_string();
|
||||
match conn
|
||||
.download_file_limited(&job.url, &dest_str, &limiter)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
println!("{}", job.dest_path.display());
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
if job.dest_path.is_file() {
|
||||
let _ = std::fs::remove_file(&job.dest_path);
|
||||
}
|
||||
Err(anyhow!(
|
||||
"Failed to download {}: {}",
|
||||
job.dest_path.display(),
|
||||
err
|
||||
))
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_download_folder(
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
folder_id: String,
|
||||
local_dir: PathBuf,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) -> Result<DownloadFolderTaskResult, anyhow::Error> {
|
||||
let folder = conn.retrieve_folder_limited(&folder_id, &limiter).await?;
|
||||
|
||||
if is_excluded(excludes.as_slice(), lab_name.as_str(), &folder.path, None) {
|
||||
return Ok(DownloadFolderTaskResult {
|
||||
child_folders: Vec::new(),
|
||||
download_jobs: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all(&local_dir).await?;
|
||||
println!("{}", local_dir.display());
|
||||
|
||||
let dir_files = conn.list_all_files_limited(&folder_id, &limiter).await?;
|
||||
let mut download_jobs = Vec::new();
|
||||
for file in &dir_files {
|
||||
if is_excluded(
|
||||
excludes.as_slice(),
|
||||
lab_name.as_str(),
|
||||
&folder.path,
|
||||
Some(&file.name),
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
let dest_path = local_dir.join(&file.name);
|
||||
if skip_if_exists && dest_path.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest_path) {
|
||||
if meta.len() == file.size {
|
||||
println!("{}", dest_path.display());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
download_jobs.push(DownloadJob {
|
||||
url: make_absolute_url(&conn, &file.download_url),
|
||||
dest_path,
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folder_tasks: JoinSet<Result<Option<(String, PathBuf)>, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
for sub_folder in folder.sub_folders {
|
||||
let conn = conn.clone();
|
||||
let limiter = limiter.clone();
|
||||
let password = password.clone();
|
||||
let sub_local = local_dir.join(&sub_folder.name);
|
||||
child_folder_tasks.spawn(async move {
|
||||
if sub_folder.lock {
|
||||
match password.as_deref() {
|
||||
Some(pw) => {
|
||||
if conn
|
||||
.folder_auth_limited(&sub_folder.id, pw, &limiter)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
None => return Ok(None),
|
||||
}
|
||||
}
|
||||
Ok(Some((sub_folder.id, sub_local)))
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folders = Vec::new();
|
||||
while let Some(result) = child_folder_tasks.join_next().await {
|
||||
if let Some(child) = flatten_join_result(result)? {
|
||||
child_folders.push(child);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DownloadFolderTaskResult {
|
||||
child_folders,
|
||||
download_jobs,
|
||||
})
|
||||
}
|
||||
|
||||
async fn drive_download_tasks(
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
loop {
|
||||
match (folder_tasks.is_empty(), download_tasks.is_empty()) {
|
||||
(true, true) => break,
|
||||
(false, true) => {
|
||||
if let Some(result) = folder_tasks.join_next().await {
|
||||
handle_download_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
download_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
(true, false) => {
|
||||
if let Some(result) = download_tasks.join_next().await {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
(false, false) => {
|
||||
tokio::select! {
|
||||
result = folder_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
handle_download_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
download_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
result = download_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_download_folder_result(
|
||||
result: Result<Result<DownloadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
match flatten_join_result(result) {
|
||||
Ok(task_result) => {
|
||||
for (folder_id, local_dir) in task_result.child_folders {
|
||||
spawn_download_folder_task(
|
||||
folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
folder_id,
|
||||
local_dir,
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
for job in task_result.download_jobs {
|
||||
spawn_download_task(download_tasks, conn.clone(), limiter.clone(), job);
|
||||
}
|
||||
}
|
||||
Err(err) => errors.push(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn flatten_join_result<T>(
|
||||
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
|
||||
) -> Result<T, anyhow::Error> {
|
||||
match result {
|
||||
Ok(inner) => inner,
|
||||
Err(err) => Err(anyhow!("Task join failed: {}", err)),
|
||||
}
|
||||
}
|
||||
|
||||
+17
-64
@@ -1,14 +1,10 @@
|
||||
use crate::cache::{CacheLabsWrapper, CacheUser, compute_digest};
|
||||
use crate::cache::{Cache, CacheLabsWrapper, CacheToken, CacheUser, compute_digest, persist_cache};
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::laboratory::Laboratories;
|
||||
use crate::models::user::User;
|
||||
use anyhow::{anyhow, bail};
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use serde_json::{Value, json};
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
/// Prompt for credentials if not supplied and then perform login.
|
||||
/// This is the entry point called from `main`.
|
||||
@@ -72,66 +68,23 @@ pub async fn login(username: &str, password: &str, remote: &str) -> Result<(), a
|
||||
let cache_labs: CacheLabsWrapper = (&labs).into();
|
||||
|
||||
// compute Python-compatible digest
|
||||
let digest = compute_digest(
|
||||
cache_user_opt.as_ref(),
|
||||
&token.access,
|
||||
&token.refresh,
|
||||
&cache_labs,
|
||||
);
|
||||
|
||||
// build the cache JSON — field order matches Python's dataclass layout:
|
||||
// user (id, username, laboratory_ids, is_reviewer)
|
||||
// token (access, refresh)
|
||||
// laboratories (items)
|
||||
// digest
|
||||
let user_val: Value = match &cache_user_opt {
|
||||
Some(u) => json!({
|
||||
"id": u.id,
|
||||
"username": u.username,
|
||||
"laboratory_ids": u.laboratory_ids,
|
||||
"is_reviewer": u.is_reviewer
|
||||
}),
|
||||
None => Value::Null,
|
||||
let cache = Cache {
|
||||
user: cache_user_opt,
|
||||
token: CacheToken {
|
||||
access: token.access,
|
||||
refresh: token.refresh,
|
||||
},
|
||||
laboratories: cache_labs,
|
||||
digest: String::new(),
|
||||
};
|
||||
let labs_items: Vec<Value> = cache_labs
|
||||
.items
|
||||
.iter()
|
||||
.map(|l| {
|
||||
json!({
|
||||
"id": l.id,
|
||||
"name": l.name,
|
||||
"pi_name": l.pi_name,
|
||||
"full_name": l.full_name
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let obj = json!({
|
||||
"user": user_val,
|
||||
"token": {"access": token.access, "refresh": token.refresh},
|
||||
"laboratories": {"items": labs_items},
|
||||
"digest": digest
|
||||
});
|
||||
|
||||
// write cache file: {config_dirname}/cache/<remote>.json
|
||||
let cache_dir = crate::settings::SETTINGS.config_dirname.join("cache");
|
||||
fs::create_dir_all(&cache_dir)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&cache_dir)?.permissions();
|
||||
perms.set_mode(0o700);
|
||||
fs::set_permissions(&cache_dir, perms)?;
|
||||
}
|
||||
let cache_file = cache_dir.join(format!("{}.json", remote));
|
||||
let tmp = cache_file.with_extension("tmp");
|
||||
|
||||
fs::write(&tmp, serde_json::to_vec_pretty(&obj)?)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&tmp)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp, perms)?;
|
||||
}
|
||||
fs::rename(&tmp, &cache_file)?;
|
||||
let mut cache = cache;
|
||||
cache.digest = compute_digest(
|
||||
cache.user.as_ref(),
|
||||
&cache.token.access,
|
||||
&cache.token.refresh,
|
||||
&cache.laboratories,
|
||||
);
|
||||
persist_cache(remote, &cache)?;
|
||||
|
||||
println!("Login Successful");
|
||||
Ok(())
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
pub fn logout(remote: &str) -> Result<(), anyhow::Error> {
|
||||
let cache_path = crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote));
|
||||
if cache_path.exists() {
|
||||
std::fs::remove_file(&cache_path)?;
|
||||
}
|
||||
Ok(())
|
||||
crate::cache::remove_cache(remote)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::cache::{Cache, CacheLaboratory};
|
||||
use crate::connection::ApiRequestLimiter;
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::file::File;
|
||||
use crate::models::folder::{FolderDetail, FolderSimple};
|
||||
@@ -95,6 +96,49 @@ pub async fn find_folder(
|
||||
Ok(folder)
|
||||
}
|
||||
|
||||
/// Resolve a folder by path while consuming the shared API concurrency budget.
|
||||
pub async fn find_folder_limited(
|
||||
conn: &MDRSConnection,
|
||||
limiter: &ApiRequestLimiter,
|
||||
lab_id: u32,
|
||||
path: &str,
|
||||
password: Option<&str>,
|
||||
) -> Result<FolderDetail, anyhow::Error> {
|
||||
let normalized_path = nfc(path);
|
||||
let folders = conn
|
||||
.list_folders_by_path_limited(lab_id, &normalized_path, limiter)
|
||||
.await?;
|
||||
if folders.is_empty() {
|
||||
bail!("Folder `{}` not found.", path);
|
||||
}
|
||||
if folders.len() != 1 {
|
||||
bail!(
|
||||
"Ambiguous path `{}`: {} folders matched.",
|
||||
path,
|
||||
folders.len()
|
||||
);
|
||||
}
|
||||
let folder_simple = &folders[0];
|
||||
if folder_simple.lock {
|
||||
match password {
|
||||
None => {
|
||||
bail!(
|
||||
"Folder `{}` is locked. Use -p/--password to provide a password.",
|
||||
path
|
||||
);
|
||||
}
|
||||
Some(pw) => {
|
||||
conn.folder_auth_limited(&folder_simple.id, pw, limiter)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
let folder = conn
|
||||
.retrieve_folder_limited(&folder_simple.id, limiter)
|
||||
.await?;
|
||||
Ok(folder)
|
||||
}
|
||||
|
||||
/// Find a file by name (NFC-normalized, case-insensitive) in a file list.
|
||||
pub fn find_file_by_name<'a>(files: &'a [File], name: &str) -> Option<&'a File> {
|
||||
let name_lower = nfc(name).to_lowercase();
|
||||
|
||||
+261
-75
@@ -1,13 +1,14 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{
|
||||
find_file_by_name, find_folder, find_lab_in_cache, nfc, parse_remote_path,
|
||||
find_file_by_name, find_folder_limited, find_lab_in_cache, nfc, parse_remote_path,
|
||||
};
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
use crate::models::folder::FolderSimple;
|
||||
use anyhow::{anyhow, bail};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub async fn upload(
|
||||
local_path: &str,
|
||||
@@ -18,8 +19,9 @@ pub async fn upload(
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
|
||||
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
let dest_folder = find_folder(&conn, lab.id, &r_path, None).await?;
|
||||
let dest_folder = find_folder_limited(&conn, &limiter, lab.id, &r_path, None).await?;
|
||||
|
||||
// Normalize local_path: resolve to an absolute canonical path so that
|
||||
// trailing slashes and "./" prefixes are handled consistently (matching
|
||||
@@ -30,7 +32,9 @@ pub async fn upload(
|
||||
|
||||
if local.is_file() {
|
||||
let filename = local.file_name().unwrap().to_string_lossy().to_string();
|
||||
let remote_files = conn.list_all_files(&dest_folder.id).await?;
|
||||
let remote_files = conn
|
||||
.list_all_files_limited(&dest_folder.id, &limiter)
|
||||
.await?;
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if rf.size == std::fs::metadata(local)?.len() {
|
||||
@@ -39,7 +43,7 @@ pub async fn upload(
|
||||
}
|
||||
}
|
||||
}
|
||||
conn.upload_file(&dest_folder.id, &local.to_string_lossy())
|
||||
conn.upload_file_limited(&dest_folder.id, &local.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}{}", dest_folder.path, filename);
|
||||
} else if local.is_dir() {
|
||||
@@ -52,75 +56,43 @@ pub async fn upload(
|
||||
let local_basename = local.file_name().unwrap().to_string_lossy().to_string();
|
||||
let top_remote_id = find_or_create_folder(
|
||||
&conn,
|
||||
&limiter,
|
||||
&dest_folder.id,
|
||||
&dest_folder.sub_folders,
|
||||
&local_basename,
|
||||
)
|
||||
.await?;
|
||||
let top_folder = conn.retrieve_folder(&top_remote_id).await?;
|
||||
println!("{}", top_folder.path.trim_end_matches('/'));
|
||||
println!(
|
||||
"{}",
|
||||
format!("{}{}", dest_folder.path, local_basename).trim_end_matches('/')
|
||||
);
|
||||
|
||||
// Iterative depth-first walk: each entry is (local_dir, remote_folder_id)
|
||||
let mut stack: Vec<(PathBuf, String)> = vec![(local.to_path_buf(), top_remote_id)];
|
||||
let mut folder_tasks: JoinSet<Result<UploadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut upload_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
|
||||
while let Some((local_dir, remote_id)) = stack.pop() {
|
||||
let folder_detail = conn.retrieve_folder(&remote_id).await?;
|
||||
let remote_files = conn.list_all_files(&remote_id).await?;
|
||||
spawn_upload_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
local.to_path_buf(),
|
||||
top_remote_id,
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
let mut entries = fs::read_dir(&local_dir).await?;
|
||||
let mut subdirs: Vec<PathBuf> = Vec::new();
|
||||
let mut files: Vec<PathBuf> = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let p = entry.path();
|
||||
if p.is_dir() {
|
||||
subdirs.push(p);
|
||||
} else {
|
||||
files.push(p);
|
||||
}
|
||||
}
|
||||
drive_upload_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut upload_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Ensure each local sub-directory exists on the remote side
|
||||
for subdir in subdirs {
|
||||
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
|
||||
let sub_remote_id =
|
||||
find_or_create_folder(&conn, &remote_id, &folder_detail.sub_folders, &dirname)
|
||||
.await?;
|
||||
let sub_folder = conn.retrieve_folder(&sub_remote_id).await?;
|
||||
println!("{}", sub_folder.path.trim_end_matches('/'));
|
||||
stack.push((subdir, sub_remote_id));
|
||||
}
|
||||
|
||||
// Upload files in this directory (up to 10 concurrent)
|
||||
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
|
||||
for file_path in files {
|
||||
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
|
||||
let file_path_str = file_path.to_string_lossy().to_string();
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if let Ok(meta) = std::fs::metadata(&file_path) {
|
||||
if rf.size == meta.len() {
|
||||
let remote_path_prefix = folder_detail.path.clone();
|
||||
println!("{}{}", remote_path_prefix, filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let conn = conn.clone();
|
||||
let folder_id = remote_id.clone();
|
||||
let remote_path_prefix = folder_detail.path.clone();
|
||||
let fname = filename.clone();
|
||||
futs.push(tokio::spawn(async move {
|
||||
match conn.upload_file(&folder_id, &file_path_str).await {
|
||||
Ok(_) => println!("{}{}", remote_path_prefix, fname),
|
||||
Err(e) => eprintln!("Error: {}", e),
|
||||
}
|
||||
}));
|
||||
if futs.len() >= crate::settings::SETTINGS.concurrent {
|
||||
let _ = futs.next().await;
|
||||
}
|
||||
}
|
||||
while futs.next().await.is_some() {}
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
} else {
|
||||
bail!("File or directory `{}` not found.", local_path);
|
||||
@@ -131,7 +103,8 @@ pub async fn upload(
|
||||
|
||||
/// Find an existing sub-folder by name or create it, returning its ID.
|
||||
async fn find_or_create_folder(
|
||||
conn: &crate::connection::MDRSConnection,
|
||||
conn: &MDRSConnection,
|
||||
limiter: &ApiRequestLimiter,
|
||||
parent_id: &str,
|
||||
existing: &[FolderSimple],
|
||||
name: &str,
|
||||
@@ -142,13 +115,226 @@ async fn find_or_create_folder(
|
||||
{
|
||||
return Ok(sf.id.clone());
|
||||
}
|
||||
let resp = conn.create_folder(parent_id, &nfc(name)).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Failed to create remote folder: {}", name);
|
||||
}
|
||||
let json: serde_json::Value = resp.json().await?;
|
||||
json["id"]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow!("No id in create_folder response for {}", name))
|
||||
.map(|s| s.to_string())
|
||||
conn.create_folder_id_limited(parent_id, &nfc(name), limiter)
|
||||
.await
|
||||
}
|
||||
|
||||
struct UploadFolderTaskResult {
|
||||
child_folders: Vec<(PathBuf, String)>,
|
||||
upload_jobs: Vec<UploadJob>,
|
||||
}
|
||||
|
||||
struct UploadJob {
|
||||
folder_id: String,
|
||||
file_path: String,
|
||||
remote_path: String,
|
||||
}
|
||||
|
||||
fn spawn_upload_folder_task(
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
local_dir: PathBuf,
|
||||
remote_id: String,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
folder_tasks.spawn(async move {
|
||||
process_upload_folder(conn, limiter, local_dir, remote_id, skip_if_exists).await
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_upload_task(
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
job: UploadJob,
|
||||
) {
|
||||
upload_tasks.spawn(async move {
|
||||
conn.upload_file_limited(&job.folder_id, &job.file_path, &limiter)
|
||||
.await?;
|
||||
println!("{}", job.remote_path);
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_upload_folder(
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
local_dir: PathBuf,
|
||||
remote_id: String,
|
||||
skip_if_exists: bool,
|
||||
) -> Result<UploadFolderTaskResult, anyhow::Error> {
|
||||
let (folder_detail, remote_files) = tokio::try_join!(
|
||||
conn.retrieve_folder_limited(&remote_id, &limiter),
|
||||
conn.list_all_files_limited(&remote_id, &limiter),
|
||||
)?;
|
||||
|
||||
let mut entries = fs::read_dir(&local_dir).await?;
|
||||
let mut subdirs: Vec<PathBuf> = Vec::new();
|
||||
let mut files: Vec<PathBuf> = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
subdirs.push(path);
|
||||
} else {
|
||||
files.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
let mut subdir_tasks: JoinSet<Result<(PathBuf, String, String), anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let existing_subfolders = Arc::new(folder_detail.sub_folders.clone());
|
||||
let folder_path_prefix = folder_detail.path.clone();
|
||||
for subdir in subdirs {
|
||||
let conn = conn.clone();
|
||||
let limiter = limiter.clone();
|
||||
let remote_id = remote_id.clone();
|
||||
let existing_subfolders = existing_subfolders.clone();
|
||||
let folder_path_prefix = folder_path_prefix.clone();
|
||||
subdir_tasks.spawn(async move {
|
||||
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
|
||||
let sub_remote_id = find_or_create_folder(
|
||||
&conn,
|
||||
&limiter,
|
||||
&remote_id,
|
||||
existing_subfolders.as_slice(),
|
||||
&dirname,
|
||||
)
|
||||
.await?;
|
||||
Ok((
|
||||
subdir,
|
||||
sub_remote_id,
|
||||
format!("{}{}", folder_path_prefix, dirname),
|
||||
))
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folders = Vec::new();
|
||||
while let Some(result) = subdir_tasks.join_next().await {
|
||||
let (subdir, sub_remote_id, remote_path) = flatten_join_result(result)?;
|
||||
println!("{}", remote_path.trim_end_matches('/'));
|
||||
child_folders.push((subdir, sub_remote_id));
|
||||
}
|
||||
|
||||
let mut upload_jobs = Vec::new();
|
||||
for file_path in files {
|
||||
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if let Ok(meta) = std::fs::metadata(&file_path) {
|
||||
if rf.size == meta.len() {
|
||||
println!("{}{}", folder_detail.path, filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
upload_jobs.push(UploadJob {
|
||||
folder_id: remote_id.clone(),
|
||||
file_path: file_path.to_string_lossy().to_string(),
|
||||
remote_path: format!("{}{}", folder_detail.path, filename),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(UploadFolderTaskResult {
|
||||
child_folders,
|
||||
upload_jobs,
|
||||
})
|
||||
}
|
||||
|
||||
async fn drive_upload_tasks(
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
loop {
|
||||
match (folder_tasks.is_empty(), upload_tasks.is_empty()) {
|
||||
(true, true) => break,
|
||||
(false, true) => {
|
||||
if let Some(result) = folder_tasks.join_next().await {
|
||||
handle_upload_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
upload_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
(true, false) => {
|
||||
if let Some(result) = upload_tasks.join_next().await {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
(false, false) => {
|
||||
tokio::select! {
|
||||
result = folder_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
handle_upload_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
upload_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
result = upload_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_upload_folder_result(
|
||||
result: Result<Result<UploadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
match flatten_join_result(result) {
|
||||
Ok(task_result) => {
|
||||
for (local_dir, remote_id) in task_result.child_folders {
|
||||
spawn_upload_folder_task(
|
||||
folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
local_dir,
|
||||
remote_id,
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
for job in task_result.upload_jobs {
|
||||
spawn_upload_task(upload_tasks, conn.clone(), limiter.clone(), job);
|
||||
}
|
||||
}
|
||||
Err(err) => errors.push(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn flatten_join_result<T>(
|
||||
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
|
||||
) -> Result<T, anyhow::Error> {
|
||||
match result {
|
||||
Ok(inner) => inner,
|
||||
Err(err) => Err(anyhow!("Task join failed: {}", err)),
|
||||
}
|
||||
}
|
||||
|
||||
+1
-23
@@ -1,27 +1,5 @@
|
||||
use serde::Deserialize;
|
||||
use std::fs;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CacheUser {
|
||||
username: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct WhoamiCache {
|
||||
user: Option<CacheUser>,
|
||||
}
|
||||
|
||||
pub async fn whoami(remote: &str) -> Result<(), anyhow::Error> {
|
||||
let cache_path = crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote));
|
||||
if !cache_path.exists() {
|
||||
println!("(Anonymous)");
|
||||
return Ok(());
|
||||
}
|
||||
let data = fs::read_to_string(&cache_path)?;
|
||||
match serde_json::from_str::<WhoamiCache>(&data) {
|
||||
match crate::cache::load_cache(remote) {
|
||||
Ok(cache) => match cache.user {
|
||||
Some(user) => println!("{}", user.username),
|
||||
None => println!("(Anonymous)"),
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
|
||||
use reqwest::{Client, Response};
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
fn build_user_agent() -> String {
|
||||
let info = os_info::get();
|
||||
@@ -36,6 +38,27 @@ pub struct MDRSConnection {
|
||||
pub token: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiRequestLimiter {
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl ApiRequestLimiter {
|
||||
pub fn new(limit: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(Semaphore::new(limit.max(1))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, anyhow::Error> {
|
||||
self.semaphore
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("API request limiter was closed."))
|
||||
}
|
||||
}
|
||||
|
||||
impl MDRSConnection {
|
||||
pub fn new(url: &str) -> Self {
|
||||
MDRSConnection {
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ pub struct Settings {
|
||||
/// Base directory for config and cache files.
|
||||
/// Controlled by `MDRS_CLIENT_CONFIG_DIRNAME` env var (default: `~/.mdrs-client`).
|
||||
pub config_dirname: std::path::PathBuf,
|
||||
/// Maximum number of concurrent upload/download workers.
|
||||
/// Maximum number of concurrent MDRS API requests used by upload/download.
|
||||
/// Controlled by `MDRS_CLIENT_CONCURRENT` env var (default: 10).
|
||||
pub concurrent: usize,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user