9 Commits

Author SHA1 Message Date
orrisroot afd08f2499 fix(cache): format sha256 digest manually to resolve trait bound error
Release / build-linux-x86_64 (push) Successful in 2m14s
Release / build-linux-aarch64 (push) Successful in 2m6s
Update digest formatting to manually convert the SHA-256 result bytes
to a hexadecimal string. This resolves a compilation error caused by
upgrading the `sha2` crate to v0.11, where `LowerHex` is no longer
implemented for the return type of `finalize()`.
2026-06-12 10:19:10 +09:00
orrisroot 777c5f6533 feat(doi): add DOI-based path access for commands
Support accessing repositories using DOI strings with optional subpaths
across ls, download, metadata, and file-metadata commands.

- Implement GET v3/doi/{id}/ API model and client calls
- Parse and resolve DOI paths into respective folder and files
- Extract common folder and file resolution logic to shared helpers
- Update README with example DOI-based shell commands
2026-06-12 01:28:36 +09:00
orrisroot 80b6560030 feat(download): allow anonymous download of public data 2026-06-11 21:04:25 +09:00
orrisroot beee9b4f41 fix(auth): allow anonymous read-only API access
Match the Python client for read-only commands by falling back to
anonymous API requests when no valid login cache is available.

Keep mutating commands login-only while letting ls, labs, metadata,
and file-metadata resolve laboratories and folders without a cached
token.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:02 +09:00
orrisroot 7db5c4d53f docs(readme): document selfupdate command
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:02 +09:00
orrisroot e3cd864a0c chore(release): sync lockfile version
Record the Rust package version bump in Cargo.lock so the
repository stays consistent after updating the crate version
to 2.0.0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:02 +09:00
orrisroot 32149109b4 chore(release): bump version to 2.0.0
Update the Rust package manifest to 2.0.0 so CLI version
reporting and release-related flows pick up the new version
from Cargo metadata.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:02 +09:00
orrisroot 3f2ca938bd perf(transfer): parallelize folder traversal API calls
Use a shared API request limiter across recursive upload and
download traversal so folder detail fetches, file listings,
folder auth, and transfers can run concurrently under one
budget.

Refactor the traversal loops into task-driven pipelines while
preserving skip-if-exists, excludes, cleanup, and current
output behavior.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:02 +09:00
orrisroot 4e73766732 perf(cache): reuse auth state in memory
Cache parsed auth state per remote and validate it with on-disk
file metadata so repeated authenticated API calls can skip
redundant open/read/JSON parse work within one process.

Centralize cache load, persist, and removal helpers in the cache
module, reuse them from login, logout, and whoami, and update
the refresh path to persist structured cache data directly.

Add targeted cache tests for memory reuse, invalidation after
external writes, persist updates, and cache removal.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-07 15:49:01 +09:00
24 changed files with 2223 additions and 773 deletions
Generated
+236 -397
View File
File diff suppressed because it is too large Load Diff
+10 -10
View File
@@ -1,6 +1,6 @@
[package]
name = "mdrs-client-rust"
version = "0.1.1"
version = "2.0.1"
edition = "2024"
license = "MIT"
authors = ["Neuroinformatics Unit, RIKEN CBS"]
@@ -14,23 +14,23 @@ path = "src/main.rs"
clap = { version = "4.5", features = ["derive"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.37", features = ["full"] }
serde_json = "1.0.150"
tokio = { version = "1.52.3", features = ["full"] }
futures = "0.3"
dirs = "5.0"
dirs = "6.0.0"
anyhow = "1.0.102"
configparser = "3.1.0"
configparser = "3.2.0"
validators = "0.25.3"
sha2 = "0.10"
rpassword = "7.0"
sha2 = "0.11.0"
rpassword = "7.5.4"
base64 = "0.22"
fs2 = "0.4"
ctrlc = "3"
os_info = "3"
os_info = "3.15.0"
dotenvy = "0.15"
unicode-normalization = "0.1"
self-replace = "1"
tar = "0.4"
tar = "0.4.46"
flate2 = "1"
zip = "2"
zip = "8.6.0"
tempfile = "3"
+28 -5
View File
@@ -103,7 +103,7 @@ mdrs labs neurodata:
### ls
List the contents of a remote folder.
List the contents of a remote folder. You can also specify a DOI path in the form `remote:10.xxxx/yyy.ID[/optional/subpath]`.
```shell
mdrs ls neurodata:/NIU/Repository/
@@ -111,6 +111,10 @@ mdrs ls -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/
mdrs ls -r neurodata:/NIU/Repository/Dataset1/
mdrs ls -J -r neurodata:/NIU/Repository/Dataset1/
mdrs ls -q neurodata:/NIU/Repository/
# DOI access examples:
mdrs ls neurodata:10.60178/cbs.20260429-001
mdrs ls "neurodata:10.60178/cbs.20260429-001/Figure 1"
```
### mkdir
@@ -133,7 +137,7 @@ mdrs upload -r --skip-if-exists ./dataset neurodata:/NIU/Repository/TEST/
### download
Download a file or folder from a remote path.
Download a file or folder from a remote path. You can also specify a DOI path.
```shell
mdrs download neurodata:/NIU/Repository/TEST/sample.dat ./
@@ -141,6 +145,9 @@ mdrs download -r neurodata:/NIU/Repository/TEST/dataset/ ./
mdrs download -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/Readme.dat ./
mdrs download -r --exclude /NIU/Repository/TEST/dataset/skip neurodata:/NIU/Repository/TEST/dataset/ ./
mdrs download -r --skip-if-exists neurodata:/NIU/Repository/TEST/dataset/ ./
# DOI access examples:
mdrs download -r neurodata:10.60178/cbs.20260429-001 ./
```
### mv
@@ -184,20 +191,26 @@ Available access levels: `private`, `public`, `pw_open`, `cbs_open`, `5kikan_ope
### metadata
Show metadata for a remote folder.
Show metadata for a remote folder. You can also specify a DOI path.
```shell
mdrs metadata neurodata:/NIU/Repository/TEST/
mdrs metadata neurodata:/NIU/Repository/Private/
mdrs metadata -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/
# DOI access examples:
mdrs metadata neurodata:10.60178/cbs.20260429-001
```
### file-metadata
Show metadata for a remote file.
Show metadata for a remote file. You can also specify a DOI path.
```shell
mdrs file-metadata neurodata:/NIU/Repository/TEST/dataset/sample.dat
mdrs file-metadata -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/Readme.txt
# DOI access examples:
mdrs file-metadata "neurodata:10.60178/cbs.20260429-001/Figure 1/Figure1v3.pdf"
```
### version
@@ -208,6 +221,16 @@ Show the tool name and version number.
mdrs version
```
### selfupdate
Update the current `mdrs` binary to the latest published release for
the same build target.
```shell
mdrs selfupdate
mdrs selfupdate -y
```
### help
Show help for a command.
+17
View File
@@ -0,0 +1,17 @@
use crate::connection::MDRSConnection;
use crate::models::doi::DoiResponse;
use anyhow::bail;
impl MDRSConnection {
/// Retrieve the folder associated with a DOI suffix ID (GET v3/doi/{id}/).
///
/// The MDRS DOI format is `10.xxxx/prefix.{id}` where the suffix after the
/// last `.` is the internal system ID passed to this endpoint.
pub async fn retrieve_doi(&self, id: &str) -> Result<DoiResponse, anyhow::Error> {
let resp = self.get(&format!("v3/doi/{}/", id)).await?;
if !resp.status().is_success() {
bail!("DOI lookup failed: {}", resp.status());
}
Ok(resp.json::<DoiResponse>().await?)
}
}
+48 -5
View File
@@ -1,4 +1,4 @@
use crate::connection::MDRSConnection;
use crate::connection::{ApiRequestLimiter, MDRSConnection};
pub use crate::models::file::File;
use anyhow::bail;
use unicode_normalization::UnicodeNormalization;
@@ -34,8 +34,43 @@ impl MDRSConnection {
Ok(all_files)
}
/// Upload a local file into the given remote folder.
pub async fn upload_file(&self, folder_id: &str, file_path: &str) -> Result<(), anyhow::Error> {
/// List all files in a folder while consuming the shared API concurrency budget.
pub async fn list_all_files_limited(
&self,
folder_id: &str,
limiter: &ApiRequestLimiter,
) -> Result<Vec<File>, anyhow::Error> {
let mut all_files = Vec::new();
let mut page: u32 = 1;
loop {
let params = [
("folder_id", folder_id.to_string()),
("page", page.to_string()),
];
let _permit = limiter.acquire().await?;
let resp = self.get_with_query("v3/files/", &params).await?;
if !resp.status().is_success() {
anyhow::bail!("List files failed: {}", resp.status());
}
let list: FileListResponse = resp.json().await?;
let has_next = list.next.is_some();
all_files.extend(list.results);
if !has_next {
break;
}
page += 1;
}
Ok(all_files)
}
/// Upload a local file into the given remote folder while consuming the
/// shared API concurrency budget.
pub async fn upload_file_limited(
&self,
folder_id: &str,
file_path: &str,
limiter: &ApiRequestLimiter,
) -> Result<(), anyhow::Error> {
use anyhow::{anyhow, bail};
use reqwest::multipart;
let file_name: String = std::path::Path::new(file_path)
@@ -49,6 +84,7 @@ impl MDRSConnection {
let form = multipart::Form::new()
.text("folder_id", folder_id.to_string())
.part("file", part);
let _permit = limiter.acquire().await?;
let resp = self.post_multipart("v3/files/", form).await?;
if !resp.status().is_success() {
bail!("Upload failed: {}", resp.status());
@@ -56,13 +92,20 @@ impl MDRSConnection {
Ok(())
}
/// Download a file from `url` and write it to `dest`.
pub async fn download_file(&self, url: &str, dest: &str) -> Result<(), anyhow::Error> {
/// Download a file while consuming the shared API concurrency budget.
pub async fn download_file_limited(
&self,
url: &str,
dest: &str,
limiter: &ApiRequestLimiter,
) -> Result<(), anyhow::Error> {
let _permit = limiter.acquire().await?;
let resp = self.get_url(url).await?;
if !resp.status().is_success() {
bail!("Download failed: {}", resp.status());
}
let bytes = resp.bytes().await?;
drop(_permit);
tokio::fs::write(dest, &bytes).await?;
Ok(())
}
+85 -2
View File
@@ -1,6 +1,6 @@
use crate::connection::MDRSConnection;
use crate::connection::{ApiRequestLimiter, MDRSConnection};
pub use crate::models::folder::{FolderDetail, FolderSimple};
use anyhow::bail;
use anyhow::{anyhow, bail};
impl MDRSConnection {
/// List folders matching the given path under a laboratory (GET v3/folders/?path=...&laboratory_id=...)
@@ -20,6 +20,25 @@ impl MDRSConnection {
Ok(resp.json::<Vec<FolderSimple>>().await?)
}
/// List folders by path while consuming the shared API concurrency budget.
pub async fn list_folders_by_path_limited(
&self,
lab_id: u32,
path: &str,
limiter: &ApiRequestLimiter,
) -> Result<Vec<FolderSimple>, anyhow::Error> {
let params = [
("laboratory_id", lab_id.to_string()),
("path", path.to_string()),
];
let _permit = limiter.acquire().await?;
let resp = self.get_with_query("v3/folders/", &params).await?;
if !resp.status().is_success() {
bail!("List folders failed: {}", resp.status());
}
Ok(resp.json::<Vec<FolderSimple>>().await?)
}
/// Retrieve full folder details including sub_folders (GET v3/folders/{id}/)
pub async fn retrieve_folder(&self, id: &str) -> Result<FolderDetail, anyhow::Error> {
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
@@ -29,6 +48,20 @@ impl MDRSConnection {
Ok(resp.json::<FolderDetail>().await?)
}
/// Retrieve folder details while consuming the shared API concurrency budget.
pub async fn retrieve_folder_limited(
&self,
id: &str,
limiter: &ApiRequestLimiter,
) -> Result<FolderDetail, anyhow::Error> {
let _permit = limiter.acquire().await?;
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
if !resp.status().is_success() {
bail!("Retrieve folder failed: {}", resp.status());
}
Ok(resp.json::<FolderDetail>().await?)
}
/// Create a new folder under `parent_id` (POST v3/folders/).
pub async fn create_folder(
&self,
@@ -44,6 +77,32 @@ impl MDRSConnection {
self.post_json("v3/folders/", &body).await
}
/// Create a new folder under `parent_id` and return its ID while consuming
/// the shared API concurrency budget.
pub async fn create_folder_id_limited(
&self,
parent_id: &str,
folder_name: &str,
limiter: &ApiRequestLimiter,
) -> Result<String, anyhow::Error> {
let body = serde_json::json!({
"name": folder_name,
"parent_id": parent_id,
"description": "",
"template_id": -1,
});
let _permit = limiter.acquire().await?;
let resp = self.post_json("v3/folders/", &body).await?;
if !resp.status().is_success() {
bail!("Failed to create remote folder: {}", folder_name);
}
let json: serde_json::Value = resp.json().await?;
json["id"]
.as_str()
.ok_or_else(|| anyhow!("No id in create_folder response for {}", folder_name))
.map(|s| s.to_string())
}
/// Authenticate against a password-locked folder (POST v3/folders/{id}/auth/).
/// Returns `Err` if the password is incorrect or the request fails.
pub async fn folder_auth(&self, folder_id: &str, password: &str) -> Result<(), anyhow::Error> {
@@ -61,4 +120,28 @@ impl MDRSConnection {
}
Ok(())
}
/// Authenticate against a locked folder while consuming the shared API
/// concurrency budget.
pub async fn folder_auth_limited(
&self,
folder_id: &str,
password: &str,
limiter: &ApiRequestLimiter,
) -> Result<(), anyhow::Error> {
let _permit = limiter.acquire().await?;
let resp = self
.post_json(
&format!("v3/folders/{}/auth/", folder_id),
&serde_json::json!({"password": password}),
)
.await?;
if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
bail!("Password is incorrect.");
}
if !resp.status().is_success() {
bail!("Folder auth failed: {}", resp.status());
}
Ok(())
}
}
+1
View File
@@ -1,5 +1,6 @@
// API module (add users, files, folders, laboratories, etc. here)
pub mod doi;
pub mod files;
pub mod folders;
pub mod laboratories;
+2 -1
View File
@@ -119,5 +119,6 @@ pub fn compute_digest(
let json_str = python_digest_json(user, access, refresh, labs);
let mut hasher = Sha256::new();
hasher.update(json_str.as_bytes());
format!("{:x}", hasher.finalize())
let result = hasher.finalize();
result.iter().map(|b| format!("{:02x}", b)).collect()
}
+494 -62
View File
@@ -2,13 +2,17 @@ pub mod digest;
pub mod types;
pub use digest::compute_digest;
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheUser};
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheToken, CacheUser};
use crate::connection::MDRSConnection;
use anyhow::{anyhow, bail};
use std::collections::HashMap;
use std::fs;
#[cfg(unix)]
use std::os::unix::fs::{MetadataExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::UNIX_EPOCH;
// ---------------------------------------------------------------------------
// Per-remote async mutex map (in-process serialization)
@@ -17,6 +21,29 @@ use std::sync::{Arc, LazyLock, Mutex};
static REMOTE_LOCKS: LazyLock<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct CacheStoreKey {
config_dir: PathBuf,
remote: String,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct CacheFileSnapshot {
len: u64,
modified_nanos: u128,
#[cfg(unix)]
inode: u64,
}
#[derive(Clone)]
struct MemoryCacheEntry {
snapshot: CacheFileSnapshot,
cache: Cache,
}
static MEMORY_CACHE: LazyLock<Mutex<HashMap<CacheStoreKey, MemoryCacheEntry>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
let mut map = REMOTE_LOCKS.lock().unwrap();
map.entry(remote.to_string())
@@ -28,11 +55,198 @@ fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
// Cache file path helpers
// ---------------------------------------------------------------------------
fn cache_file_path(remote: &str) -> std::path::PathBuf {
crate::settings::SETTINGS
.config_dirname
.join("cache")
.join(format!("{}.json", remote))
fn cache_store_key(config_dir: &Path, remote: &str) -> CacheStoreKey {
CacheStoreKey {
config_dir: config_dir.to_path_buf(),
remote: remote.to_string(),
}
}
fn cache_dir_path(config_dir: &Path) -> PathBuf {
config_dir.join("cache")
}
fn cache_file_path_in(config_dir: &Path, remote: &str) -> PathBuf {
cache_dir_path(config_dir).join(format!("{}.json", remote))
}
fn cache_file_path(remote: &str) -> PathBuf {
cache_file_path_in(&crate::settings::SETTINGS.config_dirname, remote)
}
fn cache_snapshot(metadata: &fs::Metadata) -> CacheFileSnapshot {
let modified_nanos = metadata
.modified()
.ok()
.and_then(|time| time.duration_since(UNIX_EPOCH).ok())
.map(|duration| duration.as_nanos())
.unwrap_or_default();
CacheFileSnapshot {
len: metadata.len(),
modified_nanos,
#[cfg(unix)]
inode: metadata.ino(),
}
}
fn read_cache_snapshot(cache_path: &Path) -> Result<CacheFileSnapshot, std::io::Error> {
fs::metadata(cache_path).map(|metadata| cache_snapshot(&metadata))
}
fn cached_entry(config_dir: &Path, remote: &str, snapshot: &CacheFileSnapshot) -> Option<Cache> {
let key = cache_store_key(config_dir, remote);
let map = MEMORY_CACHE.lock().unwrap();
map.get(&key)
.filter(|entry| entry.snapshot == *snapshot)
.map(|entry| entry.cache.clone())
}
fn update_cached_entry(config_dir: &Path, remote: &str, snapshot: CacheFileSnapshot, cache: Cache) {
let key = cache_store_key(config_dir, remote);
let mut map = MEMORY_CACHE.lock().unwrap();
map.insert(key, MemoryCacheEntry { snapshot, cache });
}
fn invalidate_cached_entry(config_dir: &Path, remote: &str) {
let key = cache_store_key(config_dir, remote);
let mut map = MEMORY_CACHE.lock().unwrap();
map.remove(&key);
}
fn ensure_cache_dir(cache_dir: &Path) -> Result<(), anyhow::Error> {
fs::create_dir_all(cache_dir)?;
#[cfg(unix)]
{
let mut perms = fs::metadata(cache_dir)?.permissions();
perms.set_mode(0o700);
fs::set_permissions(cache_dir, perms)?;
}
Ok(())
}
fn write_cache_file(cache_path: &Path, cache: &Cache) -> Result<(), anyhow::Error> {
let tmp_path = cache_path.with_extension("tmp");
fs::write(&tmp_path, serde_json::to_vec_pretty(cache)?)?;
#[cfg(unix)]
{
let mut perms = fs::metadata(&tmp_path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(&tmp_path, perms)?;
}
fs::rename(&tmp_path, cache_path)?;
Ok(())
}
fn parse_cache(remote: &str, data: &str) -> Result<Cache, anyhow::Error> {
serde_json::from_str::<Cache>(data).map_err(|e| {
anyhow!(
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
remote,
e,
remote
)
})
}
fn load_cache_from_dir(remote: &str, config_dir: &Path) -> Result<Cache, anyhow::Error> {
let cache_path = cache_file_path_in(config_dir, remote);
let snapshot = match read_cache_snapshot(&cache_path) {
Ok(snapshot) => snapshot,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
invalidate_cached_entry(config_dir, remote);
bail!(
"Not logged in to `{}`. Run `mdrs login {}` first.",
remote,
remote
);
}
Err(e) => return Err(e.into()),
};
if let Some(cache) = cached_entry(config_dir, remote, &snapshot) {
return Ok(cache);
}
let data = match fs::read_to_string(&cache_path) {
Ok(data) => data,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
invalidate_cached_entry(config_dir, remote);
bail!(
"Not logged in to `{}`. Run `mdrs login {}` first.",
remote,
remote
);
}
Err(e) => return Err(e.into()),
};
let cache = parse_cache(remote, &data)?;
update_cached_entry(config_dir, remote, snapshot, cache.clone());
Ok(cache)
}
fn load_cache_if_present_from_dir(
remote: &str,
config_dir: &Path,
) -> Result<Option<Cache>, anyhow::Error> {
let cache_path = cache_file_path_in(config_dir, remote);
let snapshot = match read_cache_snapshot(&cache_path) {
Ok(snapshot) => snapshot,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
invalidate_cached_entry(config_dir, remote);
return Ok(None);
}
Err(e) => return Err(e.into()),
};
if let Some(cache) = cached_entry(config_dir, remote, &snapshot) {
return Ok(Some(cache));
}
let data = match fs::read_to_string(&cache_path) {
Ok(data) => data,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
invalidate_cached_entry(config_dir, remote);
return Ok(None);
}
Err(e) => return Err(e.into()),
};
let cache = match parse_cache(remote, &data) {
Ok(cache) => cache,
Err(_) => {
remove_cache_in_dir(remote, config_dir)?;
return Ok(None);
}
};
update_cached_entry(config_dir, remote, snapshot, cache.clone());
Ok(Some(cache))
}
fn persist_cache_in_dir(
remote: &str,
config_dir: &Path,
cache: &Cache,
) -> Result<(), anyhow::Error> {
let cache_dir = cache_dir_path(config_dir);
ensure_cache_dir(&cache_dir)?;
let cache_path = cache_file_path_in(config_dir, remote);
write_cache_file(&cache_path, cache)?;
let snapshot = read_cache_snapshot(&cache_path)?;
update_cached_entry(config_dir, remote, snapshot, cache.clone());
Ok(())
}
fn remove_cache_in_dir(remote: &str, config_dir: &Path) -> Result<(), anyhow::Error> {
let cache_path = cache_file_path_in(config_dir, remote);
match fs::remove_file(&cache_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
invalidate_cached_entry(config_dir, remote);
Ok(())
}
// ---------------------------------------------------------------------------
@@ -41,23 +255,17 @@ fn cache_file_path(remote: &str) -> std::path::PathBuf {
/// Load token and laboratories from the login cache file (no token refresh check).
pub fn load_cache(remote: &str) -> Result<Cache, anyhow::Error> {
let cache_path = cache_file_path(remote);
if !cache_path.exists() {
bail!(
"Not logged in to `{}`. Run `mdrs login {}` first.",
remote,
remote
);
}
let data = fs::read_to_string(&cache_path)?;
serde_json::from_str::<Cache>(&data).map_err(|e| {
anyhow!(
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
remote,
e,
remote
)
})
load_cache_from_dir(remote, &crate::settings::SETTINGS.config_dirname)
}
/// Persist a cache entry and refresh the in-memory fast path.
pub fn persist_cache(remote: &str, cache: &Cache) -> Result<(), anyhow::Error> {
persist_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname, cache)
}
/// Remove a cache entry from disk and memory.
pub fn remove_cache(remote: &str) -> Result<(), anyhow::Error> {
remove_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname)
}
// ---------------------------------------------------------------------------
@@ -76,6 +284,7 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
let lock = get_remote_lock(remote);
let _guard = lock.lock().await;
ensure_cache_dir(&cache_dir_path(&crate::settings::SETTINGS.config_dirname))?;
let lock_path = cache_file_path(remote).with_extension("lock");
use fs2::FileExt;
let lock_file = fs::OpenOptions::new()
@@ -98,8 +307,7 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
}
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
let new_access = refresh_and_persist(remote, &cache).await?;
cache.token.access = new_access;
cache = refresh_and_persist(remote, &cache).await?;
}
Ok(cache)
@@ -110,51 +318,89 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
result
}
async fn load_cache_with_token_refresh_optional_from_dir(
remote: &str,
config_dir: &Path,
) -> Result<Option<Cache>, anyhow::Error> {
let lock = get_remote_lock(remote);
let _guard = lock.lock().await;
ensure_cache_dir(&cache_dir_path(config_dir))?;
let lock_path = cache_file_path_in(config_dir, remote).with_extension("lock");
use fs2::FileExt;
let lock_file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&lock_path)?;
lock_file.lock_exclusive()?;
let result: Result<Option<Cache>, anyhow::Error> = async {
let Some(mut cache) = load_cache_if_present_from_dir(remote, config_dir)? else {
return Ok(None);
};
if crate::token::is_expired(&cache.token.refresh) {
remove_cache_in_dir(remote, config_dir)?;
return Ok(None);
}
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
cache = refresh_and_persist_in_dir(remote, config_dir, &cache).await?;
}
Ok(Some(cache))
}
.await;
lock_file.unlock()?;
result
}
/// Load cache when present and refresh its token if needed.
///
/// Unlike `load_cache_with_token_refresh`, this returns `Ok(None)` when the user
/// is effectively anonymous: no cache file exists, the cache is invalid, or the
/// refresh token has already expired. This mirrors the Python client behavior
/// used by read-only commands.
pub async fn load_cache_with_token_refresh_optional(
remote: &str,
) -> Result<Option<Cache>, anyhow::Error> {
load_cache_with_token_refresh_optional_from_dir(
remote,
&crate::settings::SETTINGS.config_dirname,
)
.await
}
/// Call the token-refresh endpoint and write the new access token back to the
/// cache file. The caller must already hold the per-remote async mutex.
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<String, anyhow::Error> {
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<Cache, anyhow::Error> {
refresh_and_persist_in_dir(remote, &crate::settings::SETTINGS.config_dirname, cache).await
}
async fn refresh_and_persist_in_dir(
remote: &str,
config_dir: &Path,
cache: &Cache,
) -> Result<Cache, anyhow::Error> {
let url = crate::commands::config::get_remote_url(remote)?
.ok_or_else(|| anyhow!("Remote `{}` is not configured.", remote))?;
let conn = MDRSConnection::new(&url);
let new_access = conn.token_refresh(&cache.token.refresh).await?;
let new_digest = compute_digest(
cache.user.as_ref(),
&new_access,
&cache.token.refresh,
&cache.laboratories,
let mut updated_cache = cache.clone();
updated_cache.token.access = new_access;
updated_cache.digest = compute_digest(
updated_cache.user.as_ref(),
&updated_cache.token.access,
&updated_cache.token.refresh,
&updated_cache.laboratories,
);
let cache_path = cache_file_path(remote);
let raw = fs::read_to_string(&cache_path)?;
let mut obj: serde_json::Value = serde_json::from_str(&raw)?;
persist_cache_in_dir(remote, config_dir, &updated_cache)?;
obj["token"]["access"] = serde_json::Value::String(new_access.clone());
obj["digest"] = serde_json::Value::String(new_digest);
// Write atomically: write to .tmp then rename.
let tmp_path = cache_path.with_extension("tmp");
{
use std::io::Write;
let mut tmp_file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)?;
tmp_file.write_all(serde_json::to_string(&obj)?.as_bytes())?;
tmp_file.flush()?;
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&tmp_path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(&tmp_path, perms)?;
}
fs::rename(&tmp_path, &cache_path)?;
Ok(new_access)
Ok(updated_cache)
}
// ---------------------------------------------------------------------------
@@ -166,9 +412,195 @@ pub fn create_authenticated_conn(
remote: &str,
cache: &Cache,
) -> Result<MDRSConnection, anyhow::Error> {
Ok(create_remote_conn(remote)?.with_token(cache.token.access.clone()))
}
/// Create an unauthenticated `MDRSConnection` for the given remote label.
pub fn create_remote_conn(remote: &str) -> Result<MDRSConnection, anyhow::Error> {
let url = crate::commands::config::get_remote_url(remote)?
.ok_or_else(|| anyhow!("Remote `{}` is not configured.", remote))?;
Ok(MDRSConnection::new(&url)
.with_remote(remote)
.with_token(cache.token.access.clone()))
Ok(MDRSConnection::new(&url).with_remote(remote))
}
/// Create a connection for read-only commands, attaching a bearer token only
/// when a valid login cache is available.
pub async fn create_readonly_conn(
remote: &str,
) -> Result<(MDRSConnection, Option<Cache>), anyhow::Error> {
let conn = create_remote_conn(remote)?;
match load_cache_with_token_refresh_optional(remote).await? {
Some(cache) => Ok((conn.with_token(cache.token.access.clone()), Some(cache))),
None => Ok((conn, None)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn sample_cache(username: &str) -> Cache {
Cache {
user: Some(CacheUser {
id: 1,
username: username.to_string(),
laboratory_ids: vec![10, 20],
is_reviewer: false,
}),
token: types::CacheToken {
access: format!("access-{username}"),
refresh: format!("refresh-{username}"),
},
laboratories: CacheLabsWrapper {
items: vec![CacheLaboratory {
id: 10,
name: "lab".to_string(),
pi_name: String::new(),
full_name: "Laboratory".to_string(),
}],
},
digest: format!("digest-{username}"),
}
}
fn remote_name(prefix: &str, config_dir: &Path) -> String {
format!(
"{prefix}-{}",
config_dir
.file_name()
.unwrap_or_default()
.to_string_lossy()
.replace('.', "_")
)
}
#[cfg(unix)]
#[test]
fn load_cache_uses_memory_fast_path_when_snapshot_matches() {
let dir = tempdir().unwrap();
let remote = remote_name("fast-path", dir.path());
let cache = sample_cache("alice");
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
assert_eq!(first.user.unwrap().username, "alice");
let cache_path = cache_file_path_in(dir.path(), &remote);
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
perms.set_mode(0o000);
fs::set_permissions(&cache_path, perms).unwrap();
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
assert_eq!(second.user.unwrap().username, "alice");
}
#[test]
fn load_cache_reloads_when_external_writer_changes_file() {
let dir = tempdir().unwrap();
let remote = remote_name("reload", dir.path());
let original = sample_cache("alice");
let updated = sample_cache("bob");
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
assert_eq!(first.user.unwrap().username, "alice");
let cache_dir = cache_dir_path(dir.path());
ensure_cache_dir(&cache_dir).unwrap();
let cache_path = cache_file_path_in(dir.path(), &remote);
write_cache_file(&cache_path, &updated).unwrap();
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
assert_eq!(second.user.unwrap().username, "bob");
}
#[cfg(unix)]
#[test]
fn persist_cache_refreshes_memory_entry() {
let dir = tempdir().unwrap();
let remote = remote_name("persist", dir.path());
let original = sample_cache("alice");
let updated = sample_cache("bob");
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
persist_cache_in_dir(&remote, dir.path(), &updated).unwrap();
let cache_path = cache_file_path_in(dir.path(), &remote);
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
perms.set_mode(0o000);
fs::set_permissions(&cache_path, perms).unwrap();
let loaded = load_cache_from_dir(&remote, dir.path()).unwrap();
assert_eq!(loaded.user.unwrap().username, "bob");
}
#[test]
fn remove_cache_invalidates_memory_entry() {
let dir = tempdir().unwrap();
let remote = remote_name("remove", dir.path());
let cache = sample_cache("alice");
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
remove_cache_in_dir(&remote, dir.path()).unwrap();
let err = load_cache_from_dir(&remote, dir.path()).unwrap_err();
assert!(
err.to_string()
.contains(&format!("Not logged in to `{remote}`"))
);
}
fn make_jwt_with_exp(exp: i64) -> String {
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
let payload = URL_SAFE_NO_PAD.encode(format!(r#"{{"exp":{exp}}}"#));
format!("{header}.{payload}.")
}
#[test]
fn load_cache_if_present_returns_none_when_cache_missing() {
let dir = tempdir().unwrap();
let remote = remote_name("missing", dir.path());
let loaded = load_cache_if_present_from_dir(&remote, dir.path()).unwrap();
assert!(loaded.is_none());
}
#[test]
fn load_cache_if_present_clears_invalid_cache() {
let dir = tempdir().unwrap();
let remote = remote_name("invalid", dir.path());
let cache_dir = cache_dir_path(dir.path());
ensure_cache_dir(&cache_dir).unwrap();
let cache_path = cache_file_path_in(dir.path(), &remote);
fs::write(&cache_path, b"{invalid json").unwrap();
let loaded = load_cache_if_present_from_dir(&remote, dir.path()).unwrap();
assert!(loaded.is_none());
assert!(!cache_path.exists());
}
#[tokio::test]
async fn optional_cache_load_treats_expired_session_as_anonymous() {
let dir = tempdir().unwrap();
let remote = remote_name("expired", dir.path());
let mut cache = sample_cache("alice");
cache.token.access = make_jwt_with_exp(0);
cache.token.refresh = make_jwt_with_exp(0);
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
let loaded = load_cache_with_token_refresh_optional_from_dir(&remote, dir.path())
.await
.unwrap();
assert!(loaded.is_none());
assert!(!cache_file_path_in(dir.path(), &remote).exists());
}
}
+8 -6
View File
@@ -1,14 +1,14 @@
use serde::Deserialize;
use serde::{Deserialize, Serialize};
/// Access and refresh token pair stored in the login cache file.
#[derive(Deserialize, Clone)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct CacheToken {
pub access: String,
pub refresh: String,
}
/// Minimal user fields stored in the cache, matching Python's `User` dataclass.
#[derive(Deserialize, Clone)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct CacheUser {
pub id: u32,
pub username: String,
@@ -17,7 +17,7 @@ pub struct CacheUser {
}
/// All four laboratory fields needed for digest computation.
#[derive(Deserialize, Clone)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct CacheLaboratory {
pub id: u32,
pub name: String,
@@ -28,15 +28,17 @@ pub struct CacheLaboratory {
}
/// Wrapper matching Python's `Laboratories` serialization: `{"items": [...]}`.
#[derive(Deserialize, Clone, Default)]
#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct CacheLabsWrapper {
pub items: Vec<CacheLaboratory>,
}
/// Full login cache, corresponding to the `<remote>.json` file written by `login`.
#[derive(Deserialize, Clone)]
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct Cache {
pub user: Option<CacheUser>,
pub token: CacheToken,
pub laboratories: CacheLabsWrapper,
#[serde(default)]
pub digest: String,
}
+501 -75
View File
@@ -1,12 +1,13 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::cache::create_readonly_conn;
use crate::commands::shared::{
find_file_by_name, find_folder, find_lab_in_cache, find_subfolder_by_name, parse_remote_path,
find_file_by_name, find_folder_by_doi, find_folder_limited, find_laboratory,
find_subfolder_by_name, is_doi, parse_doi_remote_path, parse_remote_path,
};
use crate::connection::MDRSConnection;
use crate::connection::{ApiRequestLimiter, MDRSConnection};
use anyhow::{anyhow, bail};
use futures::stream::{FuturesUnordered, StreamExt};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinSet;
pub async fn download(
remote_path: &str,
@@ -16,10 +17,7 @@ pub async fn download(
password: Option<&str>,
excludes: Vec<String>,
) -> Result<(), anyhow::Error> {
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
let cache = load_cache_with_token_refresh(&remote).await?;
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
let lab = find_lab_in_cache(&cache, &labname)?;
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
// Validate that local_path is an existing directory (matching Python's behaviour).
let local_real = std::fs::canonicalize(local_path)
@@ -28,6 +26,197 @@ pub async fn download(
bail!("Local directory `{}` not found.", local_path);
}
// Detect DOI path: "remote:10.xxxx/prefix.ID[/optional/sub/path]"
if is_doi(remote_path.splitn(2, ':').nth(1).unwrap_or("")) {
let (remote, doi, subpath) = parse_doi_remote_path(remote_path)?;
let (raw_conn, _cache) = create_readonly_conn(&remote).await?;
let (doi_folder, lab) = find_folder_by_doi(&raw_conn, &doi, password).await?;
let abs_path = format!("{}{}", doi_folder.path.trim_end_matches('/'), subpath);
let abs_path_clean = abs_path.trim_end_matches('/');
// Check if the target is a folder.
// If it is, we download the directory. If it is not, we download the file or subfolder.
let (folder, is_folder) = match find_folder_limited(
&raw_conn,
&limiter,
lab.id,
abs_path_clean,
password,
)
.await
{
Ok(f) => (f, true),
Err(_) => {
let (parent_path, _) = match abs_path_clean.rfind('/') {
Some(0) => ("/".to_string(), abs_path_clean[1..].to_string()),
Some(pos) => (
abs_path_clean[..pos].to_string(),
abs_path_clean[pos + 1..].to_string(),
),
None => ("/".to_string(), abs_path_clean.to_string()),
};
let parent =
find_folder_limited(&raw_conn, &limiter, lab.id, &parent_path, password)
.await?;
(parent, false)
}
};
let conn = Arc::new(raw_conn);
let excludes = Arc::new(excludes);
if is_folder {
let path = folder.path.clone();
let lab_name = Arc::new(lab.name);
let top_local = local_real.join(&folder.name);
let password_owned = password.map(str::to_string);
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
JoinSet::new();
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
let mut errors = Vec::new();
if !recursive {
// Single-folder, non-recursive: download the files inside the DOI folder.
let files = conn.list_all_files_limited(&folder.id, &limiter).await?;
for file in &files {
if is_excluded(&excludes, lab_name.as_str(), &path, Some(&file.name)) {
continue;
}
let dest = local_real.join(&file.name);
if skip_if_exists {
if dest.exists() {
if let Ok(meta) = std::fs::metadata(&dest) {
if meta.len() == file.size {
println!("{}", dest.display());
continue;
}
}
}
}
let url = make_absolute_url(&conn, &file.download_url);
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
.await?;
println!("{}", dest.display());
}
return Ok(());
}
spawn_download_folder_task(
&mut folder_tasks,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
folder.id.clone(),
top_local,
password_owned.clone(),
skip_if_exists,
);
drive_download_tasks(
&mut folder_tasks,
&mut download_tasks,
&mut errors,
conn.clone(),
limiter,
lab_name,
excludes,
password_owned,
skip_if_exists,
)
.await;
if !errors.is_empty() {
bail!(errors.join("\n"));
}
return Ok(());
}
// Case: target is a file or subfolder inside the resolved folder.
let basename = match abs_path_clean.rfind('/') {
Some(pos) => abs_path_clean[pos + 1..].to_string(),
None => abs_path_clean.to_string(),
};
let files = conn.list_all_files_limited(&folder.id, &limiter).await?;
// Case 1: basename matches a file in the folder.
if let Some(file) = find_file_by_name(&files, &basename) {
if is_excluded(&excludes, &lab.name, &folder.path, Some(&file.name)) {
return Ok(());
}
let dest = local_real.join(&file.name);
if skip_if_exists {
if dest.exists() {
if let Ok(meta) = std::fs::metadata(&dest) {
if meta.len() == file.size {
println!("{}", dest.display());
return Ok(());
}
}
}
}
let url = make_absolute_url(&conn, &file.download_url);
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
.await?;
println!("{}", dest.display());
return Ok(());
}
// Case 2: basename matches a sub-folder.
let subfolder = find_subfolder_by_name(&folder.sub_folders, &basename);
if let Some(sub) = subfolder {
if !recursive {
bail!("Cannot download `{}`: Is a folder.", abs_path_clean);
}
let top_local = local_real.join(&sub.name);
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
JoinSet::new();
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
let mut errors = Vec::new();
let lab_name = Arc::new(lab.name.clone());
let password_owned = password.map(str::to_string);
spawn_download_folder_task(
&mut folder_tasks,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
sub.id.clone(),
top_local,
password_owned.clone(),
skip_if_exists,
);
drive_download_tasks(
&mut folder_tasks,
&mut download_tasks,
&mut errors,
conn.clone(),
limiter,
lab_name,
excludes,
password_owned,
skip_if_exists,
)
.await;
if !errors.is_empty() {
bail!(errors.join("\n"));
}
return Ok(());
}
bail!("File or folder `{}` not found.", abs_path_clean);
}
// Normal path: "remote:/labname/path/..."
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
let (raw_conn, cache) = create_readonly_conn(&remote).await?;
let conn = Arc::new(raw_conn);
let lab = find_laboratory(&conn, cache.as_ref(), &labname).await?;
// Split r_path into the parent directory path and the target basename.
// Trailing slashes are already stripped by parse_remote_path, so this is safe.
let r_path_clean = r_path.trim_end_matches('/');
@@ -40,8 +229,11 @@ pub async fn download(
None => ("/".to_string(), r_path_clean.to_string()),
};
let parent_folder = find_folder(&conn, lab.id, &parent_path, password).await?;
let files = conn.list_all_files(&parent_folder.id).await?;
let parent_folder =
find_folder_limited(&conn, &limiter, lab.id, &parent_path, password).await?;
let files = conn
.list_all_files_limited(&parent_folder.id, &limiter)
.await?;
// Case 1: basename matches a file in the parent folder.
if let Some(file) = find_file_by_name(&files, &basename) {
@@ -61,7 +253,8 @@ pub async fn download(
}
}
let url = make_absolute_url(&conn, &file.download_url);
conn.download_file(&url, &dest.to_string_lossy()).await?;
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
.await?;
println!("{}", dest.display());
return Ok(());
}
@@ -76,73 +269,41 @@ pub async fn download(
// We create that subdirectory first, then recurse into it.
let top_local = local_real.join(&sub.name);
// Iterative DFS: each entry is (remote_folder_id, local_dir)
let mut stack: Vec<(String, PathBuf)> = vec![(sub.id.clone(), top_local)];
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
JoinSet::new();
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
let mut errors = Vec::new();
let excludes = Arc::new(excludes);
let lab_name = Arc::new(lab.name.clone());
let password = password.map(str::to_string);
while let Some((folder_id, local_dir)) = stack.pop() {
let folder = conn.retrieve_folder(&folder_id).await?;
spawn_download_folder_task(
&mut folder_tasks,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
sub.id.clone(),
top_local,
password.clone(),
skip_if_exists,
);
if is_excluded(&excludes, &lab.name, &folder.path, None) {
continue;
}
drive_download_tasks(
&mut folder_tasks,
&mut download_tasks,
&mut errors,
conn.clone(),
limiter,
lab_name,
excludes,
password,
skip_if_exists,
)
.await;
tokio::fs::create_dir_all(&local_dir).await?;
println!("{}", local_dir.display());
let dir_files = conn.list_all_files(&folder_id).await?;
// Download files in this folder (up to 10 concurrent).
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
for f in &dir_files {
if is_excluded(&excludes, &lab.name, &folder.path, Some(&f.name)) {
continue;
}
let dest_path = local_dir.join(&f.name);
if skip_if_exists {
if dest_path.exists() {
if let Ok(meta) = std::fs::metadata(&dest_path) {
if meta.len() == f.size {
println!("{}", dest_path.display());
continue;
}
}
}
}
let url = make_absolute_url(&conn, &f.download_url);
let conn = conn.clone();
futs.push(tokio::spawn(async move {
let dest_str = dest_path.to_string_lossy().to_string();
match conn.download_file(&url, &dest_str).await {
Ok(_) => println!("{}", dest_path.display()),
Err(_) => {
eprintln!("Failed: {}", dest_path.display());
if dest_path.is_file() {
let _ = std::fs::remove_file(&dest_path);
}
}
}
}));
if futs.len() >= crate::settings::SETTINGS.concurrent {
let _ = futs.next().await;
}
}
while futs.next().await.is_some() {}
// Push sub-folders onto the stack for recursive processing.
for sf in &folder.sub_folders {
if sf.lock {
match password {
Some(pw) => {
if conn.folder_auth(&sf.id, pw).await.is_err() {
continue;
}
}
None => continue,
}
}
let sub_local = local_dir.join(&sf.name);
stack.push((sf.id.clone(), sub_local));
}
if !errors.is_empty() {
bail!(errors.join("\n"));
}
return Ok(());
}
@@ -181,3 +342,268 @@ fn make_absolute_url(conn: &MDRSConnection, url: &str) -> String {
)
}
}
struct DownloadFolderTaskResult {
child_folders: Vec<(String, PathBuf)>,
download_jobs: Vec<DownloadJob>,
}
struct DownloadJob {
url: String,
dest_path: PathBuf,
}
fn spawn_download_folder_task(
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
lab_name: Arc<String>,
excludes: Arc<Vec<String>>,
folder_id: String,
local_dir: PathBuf,
password: Option<String>,
skip_if_exists: bool,
) {
folder_tasks.spawn(async move {
process_download_folder(
conn,
limiter,
lab_name,
excludes,
folder_id,
local_dir,
password,
skip_if_exists,
)
.await
});
}
fn spawn_download_task(
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
job: DownloadJob,
) {
download_tasks.spawn(async move {
let dest_str = job.dest_path.to_string_lossy().to_string();
match conn
.download_file_limited(&job.url, &dest_str, &limiter)
.await
{
Ok(()) => {
println!("{}", job.dest_path.display());
Ok(())
}
Err(err) => {
if job.dest_path.is_file() {
let _ = std::fs::remove_file(&job.dest_path);
}
Err(anyhow!(
"Failed to download {}: {}",
job.dest_path.display(),
err
))
}
}
});
}
async fn process_download_folder(
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
lab_name: Arc<String>,
excludes: Arc<Vec<String>>,
folder_id: String,
local_dir: PathBuf,
password: Option<String>,
skip_if_exists: bool,
) -> Result<DownloadFolderTaskResult, anyhow::Error> {
let folder = conn.retrieve_folder_limited(&folder_id, &limiter).await?;
if is_excluded(excludes.as_slice(), lab_name.as_str(), &folder.path, None) {
return Ok(DownloadFolderTaskResult {
child_folders: Vec::new(),
download_jobs: Vec::new(),
});
}
tokio::fs::create_dir_all(&local_dir).await?;
println!("{}", local_dir.display());
let dir_files = conn.list_all_files_limited(&folder_id, &limiter).await?;
let mut download_jobs = Vec::new();
for file in &dir_files {
if is_excluded(
excludes.as_slice(),
lab_name.as_str(),
&folder.path,
Some(&file.name),
) {
continue;
}
let dest_path = local_dir.join(&file.name);
if skip_if_exists && dest_path.exists() {
if let Ok(meta) = std::fs::metadata(&dest_path) {
if meta.len() == file.size {
println!("{}", dest_path.display());
continue;
}
}
}
download_jobs.push(DownloadJob {
url: make_absolute_url(&conn, &file.download_url),
dest_path,
});
}
let mut child_folder_tasks: JoinSet<Result<Option<(String, PathBuf)>, anyhow::Error>> =
JoinSet::new();
for sub_folder in folder.sub_folders {
let conn = conn.clone();
let limiter = limiter.clone();
let password = password.clone();
let sub_local = local_dir.join(&sub_folder.name);
child_folder_tasks.spawn(async move {
if sub_folder.lock {
match password.as_deref() {
Some(pw) => {
if conn
.folder_auth_limited(&sub_folder.id, pw, &limiter)
.await
.is_err()
{
return Ok(None);
}
}
None => return Ok(None),
}
}
Ok(Some((sub_folder.id, sub_local)))
});
}
let mut child_folders = Vec::new();
while let Some(result) = child_folder_tasks.join_next().await {
if let Some(child) = flatten_join_result(result)? {
child_folders.push(child);
}
}
Ok(DownloadFolderTaskResult {
child_folders,
download_jobs,
})
}
async fn drive_download_tasks(
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
errors: &mut Vec<String>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
lab_name: Arc<String>,
excludes: Arc<Vec<String>>,
password: Option<String>,
skip_if_exists: bool,
) {
loop {
match (folder_tasks.is_empty(), download_tasks.is_empty()) {
(true, true) => break,
(false, true) => {
if let Some(result) = folder_tasks.join_next().await {
handle_download_folder_result(
result,
folder_tasks,
download_tasks,
errors,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
password.clone(),
skip_if_exists,
);
}
}
(true, false) => {
if let Some(result) = download_tasks.join_next().await {
if let Err(err) = flatten_join_result(result) {
errors.push(err.to_string());
}
}
}
(false, false) => {
tokio::select! {
result = folder_tasks.join_next() => {
if let Some(result) = result {
handle_download_folder_result(
result,
folder_tasks,
download_tasks,
errors,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
password.clone(),
skip_if_exists,
);
}
}
result = download_tasks.join_next() => {
if let Some(result) = result {
if let Err(err) = flatten_join_result(result) {
errors.push(err.to_string());
}
}
}
}
}
}
}
}
fn handle_download_folder_result(
result: Result<Result<DownloadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
errors: &mut Vec<String>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
lab_name: Arc<String>,
excludes: Arc<Vec<String>>,
password: Option<String>,
skip_if_exists: bool,
) {
match flatten_join_result(result) {
Ok(task_result) => {
for (folder_id, local_dir) in task_result.child_folders {
spawn_download_folder_task(
folder_tasks,
conn.clone(),
limiter.clone(),
lab_name.clone(),
excludes.clone(),
folder_id,
local_dir,
password.clone(),
skip_if_exists,
);
}
for job in task_result.download_jobs {
spawn_download_task(download_tasks, conn.clone(), limiter.clone(), job);
}
}
Err(err) => errors.push(err.to_string()),
}
}
fn flatten_join_result<T>(
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
) -> Result<T, anyhow::Error> {
match result {
Ok(inner) => inner,
Err(err) => Err(anyhow!("Task join failed: {}", err)),
}
}
+9 -20
View File
@@ -1,27 +1,16 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::commands::shared::{
find_file_by_name, find_folder, find_lab_in_cache, parse_remote_path,
};
use crate::cache::create_readonly_conn;
use crate::commands::shared::{find_file_by_name, resolve_remote_file};
use anyhow::anyhow;
pub async fn file_metadata(remote_path: &str, password: Option<&str>) -> Result<(), anyhow::Error> {
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
let remote = remote_path
.splitn(2, ':')
.next()
.ok_or_else(|| anyhow!("Invalid remote path"))?;
let (conn, cache) = create_readonly_conn(remote).await?;
let (parent_folder, basename) =
resolve_remote_file(&conn, cache.as_ref(), remote_path, password).await?;
let cache = load_cache_with_token_refresh(&remote).await?;
let conn = create_authenticated_conn(&remote, &cache)?;
let lab = find_lab_in_cache(&cache, &labname)?;
let lab_id = lab.id;
// Split the file path into parent directory and filename
let path = r_path.trim_end_matches('/');
let (dirname, basename) = if let Some(pos) = path.rfind('/') {
let d = if pos == 0 { "/" } else { &path[..pos] };
(d.to_string(), path[pos + 1..].to_string())
} else {
("/".to_string(), path.to_string())
};
let parent_folder = find_folder(&conn, lab_id, &dirname, password).await?;
let files = conn.list_all_files(&parent_folder.id).await?;
let file = find_file_by_name(&files, &basename)
+2 -3
View File
@@ -1,8 +1,7 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::cache::create_readonly_conn;
pub async fn labs(remote: &str) -> Result<(), anyhow::Error> {
let cache = load_cache_with_token_refresh(remote).await?;
let conn = create_authenticated_conn(remote, &cache)?;
let (conn, _) = create_readonly_conn(remote).await?;
let labs = conn.list_laboratories().await?;
let header = ("Name", "PI", "Laboratory");
+17 -64
View File
@@ -1,14 +1,10 @@
use crate::cache::{CacheLabsWrapper, CacheUser, compute_digest};
use crate::cache::{Cache, CacheLabsWrapper, CacheToken, CacheUser, compute_digest, persist_cache};
use crate::connection::MDRSConnection;
use crate::models::laboratory::Laboratories;
use crate::models::user::User;
use anyhow::{anyhow, bail};
use reqwest::Client;
use serde::Deserialize;
use serde_json::{Value, json};
use std::fs;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
/// Prompt for credentials if not supplied and then perform login.
/// This is the entry point called from `main`.
@@ -72,66 +68,23 @@ pub async fn login(username: &str, password: &str, remote: &str) -> Result<(), a
let cache_labs: CacheLabsWrapper = (&labs).into();
// compute Python-compatible digest
let digest = compute_digest(
cache_user_opt.as_ref(),
&token.access,
&token.refresh,
&cache_labs,
);
// build the cache JSON — field order matches Python's dataclass layout:
// user (id, username, laboratory_ids, is_reviewer)
// token (access, refresh)
// laboratories (items)
// digest
let user_val: Value = match &cache_user_opt {
Some(u) => json!({
"id": u.id,
"username": u.username,
"laboratory_ids": u.laboratory_ids,
"is_reviewer": u.is_reviewer
}),
None => Value::Null,
let cache = Cache {
user: cache_user_opt,
token: CacheToken {
access: token.access,
refresh: token.refresh,
},
laboratories: cache_labs,
digest: String::new(),
};
let labs_items: Vec<Value> = cache_labs
.items
.iter()
.map(|l| {
json!({
"id": l.id,
"name": l.name,
"pi_name": l.pi_name,
"full_name": l.full_name
})
})
.collect();
let obj = json!({
"user": user_val,
"token": {"access": token.access, "refresh": token.refresh},
"laboratories": {"items": labs_items},
"digest": digest
});
// write cache file: {config_dirname}/cache/<remote>.json
let cache_dir = crate::settings::SETTINGS.config_dirname.join("cache");
fs::create_dir_all(&cache_dir)?;
#[cfg(unix)]
{
let mut perms = fs::metadata(&cache_dir)?.permissions();
perms.set_mode(0o700);
fs::set_permissions(&cache_dir, perms)?;
}
let cache_file = cache_dir.join(format!("{}.json", remote));
let tmp = cache_file.with_extension("tmp");
fs::write(&tmp, serde_json::to_vec_pretty(&obj)?)?;
#[cfg(unix)]
{
let mut perms = fs::metadata(&tmp)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(&tmp, perms)?;
}
fs::rename(&tmp, &cache_file)?;
let mut cache = cache;
cache.digest = compute_digest(
cache.user.as_ref(),
&cache.token.access,
&cache.token.refresh,
&cache.laboratories,
);
persist_cache(remote, &cache)?;
println!("Login Successful");
Ok(())
+1 -8
View File
@@ -1,10 +1,3 @@
pub fn logout(remote: &str) -> Result<(), anyhow::Error> {
let cache_path = crate::settings::SETTINGS
.config_dirname
.join("cache")
.join(format!("{}.json", remote));
if cache_path.exists() {
std::fs::remove_file(&cache_path)?;
}
Ok(())
crate::cache::remove_cache(remote)
}
+11 -9
View File
@@ -1,5 +1,5 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::commands::shared::{find_folder, find_lab_in_cache, fmt_datetime, parse_remote_path};
use crate::cache::create_readonly_conn;
use crate::commands::shared::{fmt_datetime, resolve_remote_folder};
use crate::connection::MDRSConnection;
use crate::models::file::File;
use crate::models::folder::{FolderDetail, FolderSimple};
@@ -14,12 +14,14 @@ pub async fn ls(
is_recursive: bool,
is_quiet: bool,
) -> Result<(), anyhow::Error> {
let (remote, labname, path) = parse_remote_path(remote_path)?;
let cache = load_cache_with_token_refresh(&remote).await?;
let conn = create_authenticated_conn(&remote, &cache)?;
let lab = find_lab_in_cache(&cache, &labname)?;
let folder = find_folder(&conn, lab.id, &path, password).await?;
let remote = remote_path
.splitn(2, ':')
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid remote path"))?;
let (conn, cache) = create_readonly_conn(remote).await?;
let (folder, lab) =
resolve_remote_folder(&conn, cache.as_ref(), None, remote_path, password).await?;
let labname = lab.name;
if is_json {
let output = if is_recursive {
@@ -29,7 +31,7 @@ pub async fn ls(
};
println!("{}", serde_json::to_string(&output)?);
} else if is_recursive {
let prefix = format!("{}:/{}", remote, labname);
let prefix = format!("{}:/{}", conn.remote.as_deref().unwrap_or(""), labname);
ls_plain_recursive(&conn, folder, &labname, &prefix, password).await?;
} else {
let files = conn.list_all_files(&folder.id).await?;
+9 -7
View File
@@ -1,12 +1,14 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::commands::shared::{find_folder, find_lab_in_cache, parse_remote_path};
use crate::cache::create_readonly_conn;
use crate::commands::shared::resolve_remote_folder;
pub async fn metadata(remote_path: &str, password: Option<&str>) -> Result<(), anyhow::Error> {
let (remote, labname, folder_path) = parse_remote_path(remote_path)?;
let cache = load_cache_with_token_refresh(&remote).await?;
let conn = create_authenticated_conn(&remote, &cache)?;
let lab = find_lab_in_cache(&cache, &labname)?;
let folder = find_folder(&conn, lab.id, &folder_path, password).await?;
let remote = remote_path
.splitn(2, ':')
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid remote path"))?;
let (conn, cache) = create_readonly_conn(remote).await?;
let (folder, _) =
resolve_remote_folder(&conn, cache.as_ref(), None, remote_path, password).await?;
let resp = conn
.get(&format!("v3/folders/{}/metadata/", folder.id))
+439
View File
@@ -1,10 +1,158 @@
use crate::cache::{Cache, CacheLaboratory};
use crate::connection::ApiRequestLimiter;
use crate::connection::MDRSConnection;
use crate::models::file::File;
use crate::models::folder::{FolderDetail, FolderSimple};
use crate::models::laboratory::Laboratory;
use anyhow::{anyhow, bail};
use unicode_normalization::UnicodeNormalization;
// ---------------------------------------------------------------------------
// DOI helpers
// ---------------------------------------------------------------------------
/// Return true if the path component (after the remote: prefix) looks like a
/// DOI string, i.e. starts with "10." and contains a "/".
pub fn is_doi(path: &str) -> bool {
path.starts_with("10.") && path.contains('/')
}
/// Extract the DOI suffix ID from a full DOI string.
///
/// MDRS uses the segment after the last `.` in the DOI as its internal
/// identifier, e.g. `10.xxxx/prefix.20230511-001` → `20230511-001`.
/// If there is no `.` after the `/`, the entire suffix after `/` is used.
pub fn doi_suffix_id(doi: &str) -> &str {
// Find the slash that separates DOI prefix from suffix.
if let Some(slash_pos) = doi.find('/') {
let suffix = &doi[slash_pos + 1..];
// Use the part after the last `.` within the suffix.
if let Some(dot_pos) = suffix.rfind('.') {
&suffix[dot_pos + 1..]
} else {
suffix
}
} else {
doi
}
}
/// Split a DOI-with-optional-path string into `(doi, subpath)`.
///
/// A DOI has the form `10.REGISTRANT/SUFFIX` where SUFFIX contains no `/`.
/// Anything after the first `/` following SUFFIX is treated as a subfolder path.
///
/// Examples:
/// - `"10.1234/prefix.ID"` → `("10.1234/prefix.ID", "")`
/// - `"10.1234/prefix.ID/"` → `("10.1234/prefix.ID", "")`
/// - `"10.1234/prefix.ID/sub"` → `("10.1234/prefix.ID", "/sub")`
/// - `"10.1234/prefix.ID/sub/deep"` → `("10.1234/prefix.ID", "/sub/deep")`
pub fn split_doi_and_subpath(doi_with_path: &str) -> (&str, &str) {
// Find the first `/` that separates registrant from suffix.
if let Some(first_slash) = doi_with_path.find('/') {
let after_suffix_start = first_slash + 1;
let after_first = &doi_with_path[after_suffix_start..];
// Find the next `/` inside the suffix portion — this starts the subpath.
if let Some(second_slash) = after_first.find('/') {
let doi_end = after_suffix_start + second_slash;
let doi = &doi_with_path[..doi_end];
let subpath = &doi_with_path[doi_end..]; // begins with "/"
// Treat a bare trailing slash as no subpath (root of DOI folder).
if subpath == "/" {
(doi, "")
} else {
(doi, subpath)
}
} else {
// No second slash — the whole string is the DOI, no subpath.
(doi_with_path, "")
}
} else {
(doi_with_path, "")
}
}
/// Parse `"remote:10.xxxx/prefix.ID[/optional/sub/path]"` into
/// `(remote, doi, subpath)`.
///
/// `subpath` is empty when the remote path points directly at the DOI root
/// folder. Otherwise it is an absolute path string like `"/subfolder/deep"`.
pub fn parse_doi_remote_path(remote_path: &str) -> Result<(String, String, String), anyhow::Error> {
let parts: Vec<&str> = remote_path.splitn(2, ':').collect();
if parts.len() != 2 {
bail!("remote_path must be in the form 'remote:10.xxxx/prefix.ID'");
}
let remote = parts[0].to_string();
let doi_with_path = parts[1];
if !is_doi(doi_with_path) {
bail!(
"Path `{}` does not look like a DOI (must start with '10.' and contain '/').",
doi_with_path
);
}
let (doi, subpath) = split_doi_and_subpath(doi_with_path);
Ok((remote, doi.to_string(), subpath.to_string()))
}
/// Resolve a DOI string to a (FolderDetail, Laboratory) pair.
///
/// Calls GET v3/doi/{id}/ to look up the folder ID, then fetches the full
/// folder detail (which carries `laboratory_id`) and resolves the laboratory.
pub async fn find_folder_by_doi(
conn: &MDRSConnection,
doi: &str,
password: Option<&str>,
) -> Result<(FolderDetail, Laboratory), anyhow::Error> {
// Strip any trailing slash from the DOI before extracting the suffix ID.
let doi_clean = doi.trim_end_matches('/');
let id = doi_suffix_id(doi_clean);
let doi_resp = conn.retrieve_doi(id).await?;
// Verify that the returned DOI matches the one supplied by the caller
// (case-insensitive, trimming trailing slashes).
let returned = doi_resp.doi.trim_end_matches('/');
if !returned.eq_ignore_ascii_case(doi_clean) {
bail!(
"DOI mismatch: requested `{}` but server returned `{}`.",
doi_clean,
returned
);
}
let folder_id = &doi_resp.folder.id;
// Fetch the full folder detail; laboratory_id is available here.
let folder = conn
.retrieve_folder(folder_id)
.await
.map_err(|e| anyhow!("Failed to retrieve folder for DOI `{}`: {}", doi_clean, e))?;
// Handle password-locked folder.
if folder.lock {
match password {
None => {
bail!(
"Folder for DOI `{}` is locked. Use -p/--password to provide a password.",
doi_clean
);
}
Some(pw) => conn.folder_auth(folder_id, pw).await?,
}
}
// Resolve laboratory using the laboratory_id from the folder detail.
let lab_id = folder.laboratory_id;
let lab = conn
.list_laboratories()
.await?
.items
.into_iter()
.find(|l| l.id == lab_id)
.ok_or_else(|| anyhow!("Laboratory with id {} not found.", lab_id))?;
Ok((folder, lab))
}
// ---------------------------------------------------------------------------
// Path helpers
// ---------------------------------------------------------------------------
@@ -47,6 +195,32 @@ pub fn find_lab_in_cache<'a>(
.ok_or_else(|| anyhow!("Laboratory `{}` not found.", labname))
}
/// Resolve a laboratory by name using cached laboratories when available, and
/// falling back to the API when the user is anonymous.
pub async fn find_laboratory(
conn: &MDRSConnection,
cache: Option<&Cache>,
labname: &str,
) -> Result<Laboratory, anyhow::Error> {
if let Some(cache) = cache {
if let Ok(lab) = find_lab_in_cache(cache, labname) {
return Ok(Laboratory {
id: lab.id,
name: lab.name.clone(),
pi_name: lab.pi_name.clone(),
full_name: lab.full_name.clone(),
});
}
}
conn.list_laboratories()
.await?
.items
.into_iter()
.find(|lab| lab.name == labname)
.ok_or_else(|| anyhow!("Laboratory `{}` not found.", labname))
}
// ---------------------------------------------------------------------------
// Unicode helpers
// ---------------------------------------------------------------------------
@@ -95,6 +269,49 @@ pub async fn find_folder(
Ok(folder)
}
/// Resolve a folder by path while consuming the shared API concurrency budget.
pub async fn find_folder_limited(
conn: &MDRSConnection,
limiter: &ApiRequestLimiter,
lab_id: u32,
path: &str,
password: Option<&str>,
) -> Result<FolderDetail, anyhow::Error> {
let normalized_path = nfc(path);
let folders = conn
.list_folders_by_path_limited(lab_id, &normalized_path, limiter)
.await?;
if folders.is_empty() {
bail!("Folder `{}` not found.", path);
}
if folders.len() != 1 {
bail!(
"Ambiguous path `{}`: {} folders matched.",
path,
folders.len()
);
}
let folder_simple = &folders[0];
if folder_simple.lock {
match password {
None => {
bail!(
"Folder `{}` is locked. Use -p/--password to provide a password.",
path
);
}
Some(pw) => {
conn.folder_auth_limited(&folder_simple.id, pw, limiter)
.await?
}
}
}
let folder = conn
.retrieve_folder_limited(&folder_simple.id, limiter)
.await?;
Ok(folder)
}
/// Find a file by name (NFC-normalized, case-insensitive) in a file list.
pub fn find_file_by_name<'a>(files: &'a [File], name: &str) -> Option<&'a File> {
let name_lower = nfc(name).to_lowercase();
@@ -134,3 +351,225 @@ pub fn fmt_datetime(iso: &str) -> String {
iso.to_string()
}
}
/// Resolve any remote path (normal or DOI-based) into a FolderDetail and Laboratory.
/// Takes an optional API limiter for download command compatibility.
pub async fn resolve_remote_folder(
conn: &MDRSConnection,
cache: Option<&Cache>,
limiter: Option<&ApiRequestLimiter>,
remote_path: &str,
password: Option<&str>,
) -> Result<(FolderDetail, Laboratory), anyhow::Error> {
let path_component = remote_path.splitn(2, ':').nth(1).unwrap_or("");
if is_doi(path_component) {
let (_, doi, subpath) = parse_doi_remote_path(remote_path)?;
let (doi_folder, lab) = find_folder_by_doi(conn, &doi, password).await?;
if subpath.is_empty() {
Ok((doi_folder, lab))
} else {
let abs_path = format!("{}{}", doi_folder.path.trim_end_matches('/'), subpath);
let folder = if let Some(l) = limiter {
find_folder_limited(conn, l, lab.id, &abs_path, password).await?
} else {
find_folder(conn, lab.id, &abs_path, password).await?
};
Ok((folder, lab))
}
} else {
let (_, labname, folder_path) = parse_remote_path(remote_path)?;
let lab = find_laboratory(conn, cache, &labname).await?;
let folder = if let Some(l) = limiter {
find_folder_limited(conn, l, lab.id, &folder_path, password).await?
} else {
find_folder(conn, lab.id, &folder_path, password).await?
};
Ok((folder, lab))
}
}
/// Resolves a remote path pointing to a file into the parent FolderDetail and the file's basename.
pub async fn resolve_remote_file(
conn: &MDRSConnection,
cache: Option<&Cache>,
remote_path: &str,
password: Option<&str>,
) -> Result<(FolderDetail, String), anyhow::Error> {
let path_component = remote_path.splitn(2, ':').nth(1).unwrap_or("");
if is_doi(path_component) {
let (_, doi, subpath) = parse_doi_remote_path(remote_path)?;
let (doi_folder, lab) = find_folder_by_doi(conn, &doi, password).await?;
let subpath_clean = subpath.trim_end_matches('/');
if subpath_clean.is_empty() {
bail!("DOI path must point to a file, not a folder.");
}
let (sub_dir, basename) = if let Some(pos) = subpath_clean.rfind('/') {
let d = if pos == 0 { "/" } else { &subpath_clean[..pos] };
(d.to_string(), subpath_clean[pos + 1..].to_string())
} else {
("/".to_string(), subpath_clean.to_string())
};
let abs_path = format!(
"{}{}",
doi_folder.path.trim_end_matches('/'),
if sub_dir.starts_with('/') {
sub_dir
} else {
format!("/{}", sub_dir)
}
);
let parent = find_folder(conn, lab.id, &abs_path, password).await?;
Ok((parent, basename))
} else {
let (_, labname, r_path) = parse_remote_path(remote_path)?;
let lab = find_laboratory(conn, cache, &labname).await?;
let path = r_path.trim_end_matches('/');
let (dirname, basename) = if let Some(pos) = path.rfind('/') {
let d = if pos == 0 { "/" } else { &path[..pos] };
(d.to_string(), path[pos + 1..].to_string())
} else {
("/".to_string(), path.to_string())
};
let parent = find_folder(conn, lab.id, &dirname, password).await?;
Ok((parent, basename))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::test]
async fn find_laboratory_falls_back_to_api_without_authorization() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 4096];
let n = stream.read(&mut buf).await.unwrap();
let req = String::from_utf8_lossy(&buf[..n]);
assert!(req.starts_with("GET /v3/laboratories/ HTTP/1.1"));
assert!(!req.contains("\r\nAuthorization: Bearer "));
let body =
r#"[{"id":1,"name":"public-lab","pi_name":"PI","full_name":"Public Laboratory"}]"#;
let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
body.len(),
body
);
stream.write_all(response.as_bytes()).await.unwrap();
});
let conn = MDRSConnection::new(&format!("http://{addr}"));
let lab = find_laboratory(&conn, None, "public-lab").await.unwrap();
assert_eq!(lab.id, 1);
assert_eq!(lab.name, "public-lab");
server.await.unwrap();
}
// ------------------------------------------------------------------
// DOI helper unit tests
// ------------------------------------------------------------------
#[test]
fn is_doi_returns_true_for_valid_doi() {
assert!(is_doi("10.12345/prefix.20230511-001"));
assert!(is_doi("10.1234/abc"));
}
#[test]
fn is_doi_returns_false_for_normal_paths() {
assert!(!is_doi("/labname/path/to/folder"));
assert!(!is_doi("labname/path"));
assert!(!is_doi("10.1234")); // no slash
}
#[test]
fn doi_suffix_id_extracts_last_dot_segment() {
assert_eq!(
doi_suffix_id("10.12345/prefix.20230511-001"),
"20230511-001"
);
}
#[test]
fn doi_suffix_id_no_dot_in_suffix_returns_whole_suffix() {
assert_eq!(doi_suffix_id("10.1234/nodot"), "nodot");
}
#[test]
fn doi_suffix_id_no_slash_returns_whole_input() {
assert_eq!(doi_suffix_id("10.1234"), "10.1234");
}
#[test]
fn parse_doi_remote_path_valid_no_subpath() {
let (remote, doi, subpath) =
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001").unwrap();
assert_eq!(remote, "neurodata");
assert_eq!(doi, "10.12345/prefix.20230511-001");
assert_eq!(subpath, "");
}
#[test]
fn parse_doi_remote_path_valid_trailing_slash() {
let (remote, doi, subpath) =
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001/").unwrap();
assert_eq!(remote, "neurodata");
assert_eq!(doi, "10.12345/prefix.20230511-001");
assert_eq!(subpath, ""); // trailing slash treated as no subpath
}
#[test]
fn parse_doi_remote_path_valid_with_subpath() {
let (remote, doi, subpath) =
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001/sub/folder").unwrap();
assert_eq!(remote, "neurodata");
assert_eq!(doi, "10.12345/prefix.20230511-001");
assert_eq!(subpath, "/sub/folder");
}
#[test]
fn parse_doi_remote_path_rejects_normal_path() {
let err = parse_doi_remote_path("neurodata:/lab/path").unwrap_err();
assert!(err.to_string().contains("does not look like a DOI"));
}
#[test]
fn split_doi_and_subpath_no_subpath() {
assert_eq!(
split_doi_and_subpath("10.1234/prefix.ID"),
("10.1234/prefix.ID", "")
);
}
#[test]
fn split_doi_and_subpath_trailing_slash_only() {
assert_eq!(
split_doi_and_subpath("10.1234/prefix.ID/"),
("10.1234/prefix.ID", "")
);
}
#[test]
fn split_doi_and_subpath_single_level() {
assert_eq!(
split_doi_and_subpath("10.1234/prefix.ID/sub"),
("10.1234/prefix.ID", "/sub")
);
}
#[test]
fn split_doi_and_subpath_multi_level() {
assert_eq!(
split_doi_and_subpath("10.1234/prefix.ID/sub/deep/path"),
("10.1234/prefix.ID", "/sub/deep/path")
);
}
}
+261 -75
View File
@@ -1,13 +1,14 @@
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
use crate::commands::shared::{
find_file_by_name, find_folder, find_lab_in_cache, nfc, parse_remote_path,
find_file_by_name, find_folder_limited, find_lab_in_cache, nfc, parse_remote_path,
};
use crate::connection::{ApiRequestLimiter, MDRSConnection};
use crate::models::folder::FolderSimple;
use anyhow::{anyhow, bail};
use futures::stream::{FuturesUnordered, StreamExt};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::task::JoinSet;
pub async fn upload(
local_path: &str,
@@ -18,8 +19,9 @@ pub async fn upload(
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
let cache = load_cache_with_token_refresh(&remote).await?;
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
let lab = find_lab_in_cache(&cache, &labname)?;
let dest_folder = find_folder(&conn, lab.id, &r_path, None).await?;
let dest_folder = find_folder_limited(&conn, &limiter, lab.id, &r_path, None).await?;
// Normalize local_path: resolve to an absolute canonical path so that
// trailing slashes and "./" prefixes are handled consistently (matching
@@ -30,7 +32,9 @@ pub async fn upload(
if local.is_file() {
let filename = local.file_name().unwrap().to_string_lossy().to_string();
let remote_files = conn.list_all_files(&dest_folder.id).await?;
let remote_files = conn
.list_all_files_limited(&dest_folder.id, &limiter)
.await?;
if skip_if_exists {
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
if rf.size == std::fs::metadata(local)?.len() {
@@ -39,7 +43,7 @@ pub async fn upload(
}
}
}
conn.upload_file(&dest_folder.id, &local.to_string_lossy())
conn.upload_file_limited(&dest_folder.id, &local.to_string_lossy(), &limiter)
.await?;
println!("{}{}", dest_folder.path, filename);
} else if local.is_dir() {
@@ -52,75 +56,43 @@ pub async fn upload(
let local_basename = local.file_name().unwrap().to_string_lossy().to_string();
let top_remote_id = find_or_create_folder(
&conn,
&limiter,
&dest_folder.id,
&dest_folder.sub_folders,
&local_basename,
)
.await?;
let top_folder = conn.retrieve_folder(&top_remote_id).await?;
println!("{}", top_folder.path.trim_end_matches('/'));
println!(
"{}",
format!("{}{}", dest_folder.path, local_basename).trim_end_matches('/')
);
// Iterative depth-first walk: each entry is (local_dir, remote_folder_id)
let mut stack: Vec<(PathBuf, String)> = vec![(local.to_path_buf(), top_remote_id)];
let mut folder_tasks: JoinSet<Result<UploadFolderTaskResult, anyhow::Error>> =
JoinSet::new();
let mut upload_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
let mut errors = Vec::new();
while let Some((local_dir, remote_id)) = stack.pop() {
let folder_detail = conn.retrieve_folder(&remote_id).await?;
let remote_files = conn.list_all_files(&remote_id).await?;
spawn_upload_folder_task(
&mut folder_tasks,
conn.clone(),
limiter.clone(),
local.to_path_buf(),
top_remote_id,
skip_if_exists,
);
let mut entries = fs::read_dir(&local_dir).await?;
let mut subdirs: Vec<PathBuf> = Vec::new();
let mut files: Vec<PathBuf> = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let p = entry.path();
if p.is_dir() {
subdirs.push(p);
} else {
files.push(p);
}
}
drive_upload_tasks(
&mut folder_tasks,
&mut upload_tasks,
&mut errors,
conn.clone(),
limiter,
skip_if_exists,
)
.await;
// Ensure each local sub-directory exists on the remote side
for subdir in subdirs {
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
let sub_remote_id =
find_or_create_folder(&conn, &remote_id, &folder_detail.sub_folders, &dirname)
.await?;
let sub_folder = conn.retrieve_folder(&sub_remote_id).await?;
println!("{}", sub_folder.path.trim_end_matches('/'));
stack.push((subdir, sub_remote_id));
}
// Upload files in this directory (up to 10 concurrent)
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
for file_path in files {
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
let file_path_str = file_path.to_string_lossy().to_string();
if skip_if_exists {
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
if let Ok(meta) = std::fs::metadata(&file_path) {
if rf.size == meta.len() {
let remote_path_prefix = folder_detail.path.clone();
println!("{}{}", remote_path_prefix, filename);
continue;
}
}
}
}
let conn = conn.clone();
let folder_id = remote_id.clone();
let remote_path_prefix = folder_detail.path.clone();
let fname = filename.clone();
futs.push(tokio::spawn(async move {
match conn.upload_file(&folder_id, &file_path_str).await {
Ok(_) => println!("{}{}", remote_path_prefix, fname),
Err(e) => eprintln!("Error: {}", e),
}
}));
if futs.len() >= crate::settings::SETTINGS.concurrent {
let _ = futs.next().await;
}
}
while futs.next().await.is_some() {}
if !errors.is_empty() {
bail!(errors.join("\n"));
}
} else {
bail!("File or directory `{}` not found.", local_path);
@@ -131,7 +103,8 @@ pub async fn upload(
/// Find an existing sub-folder by name or create it, returning its ID.
async fn find_or_create_folder(
conn: &crate::connection::MDRSConnection,
conn: &MDRSConnection,
limiter: &ApiRequestLimiter,
parent_id: &str,
existing: &[FolderSimple],
name: &str,
@@ -142,13 +115,226 @@ async fn find_or_create_folder(
{
return Ok(sf.id.clone());
}
let resp = conn.create_folder(parent_id, &nfc(name)).await?;
if !resp.status().is_success() {
bail!("Failed to create remote folder: {}", name);
}
let json: serde_json::Value = resp.json().await?;
json["id"]
.as_str()
.ok_or_else(|| anyhow!("No id in create_folder response for {}", name))
.map(|s| s.to_string())
conn.create_folder_id_limited(parent_id, &nfc(name), limiter)
.await
}
struct UploadFolderTaskResult {
child_folders: Vec<(PathBuf, String)>,
upload_jobs: Vec<UploadJob>,
}
struct UploadJob {
folder_id: String,
file_path: String,
remote_path: String,
}
fn spawn_upload_folder_task(
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
local_dir: PathBuf,
remote_id: String,
skip_if_exists: bool,
) {
folder_tasks.spawn(async move {
process_upload_folder(conn, limiter, local_dir, remote_id, skip_if_exists).await
});
}
fn spawn_upload_task(
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
job: UploadJob,
) {
upload_tasks.spawn(async move {
conn.upload_file_limited(&job.folder_id, &job.file_path, &limiter)
.await?;
println!("{}", job.remote_path);
Ok(())
});
}
async fn process_upload_folder(
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
local_dir: PathBuf,
remote_id: String,
skip_if_exists: bool,
) -> Result<UploadFolderTaskResult, anyhow::Error> {
let (folder_detail, remote_files) = tokio::try_join!(
conn.retrieve_folder_limited(&remote_id, &limiter),
conn.list_all_files_limited(&remote_id, &limiter),
)?;
let mut entries = fs::read_dir(&local_dir).await?;
let mut subdirs: Vec<PathBuf> = Vec::new();
let mut files: Vec<PathBuf> = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
subdirs.push(path);
} else {
files.push(path);
}
}
let mut subdir_tasks: JoinSet<Result<(PathBuf, String, String), anyhow::Error>> =
JoinSet::new();
let existing_subfolders = Arc::new(folder_detail.sub_folders.clone());
let folder_path_prefix = folder_detail.path.clone();
for subdir in subdirs {
let conn = conn.clone();
let limiter = limiter.clone();
let remote_id = remote_id.clone();
let existing_subfolders = existing_subfolders.clone();
let folder_path_prefix = folder_path_prefix.clone();
subdir_tasks.spawn(async move {
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
let sub_remote_id = find_or_create_folder(
&conn,
&limiter,
&remote_id,
existing_subfolders.as_slice(),
&dirname,
)
.await?;
Ok((
subdir,
sub_remote_id,
format!("{}{}", folder_path_prefix, dirname),
))
});
}
let mut child_folders = Vec::new();
while let Some(result) = subdir_tasks.join_next().await {
let (subdir, sub_remote_id, remote_path) = flatten_join_result(result)?;
println!("{}", remote_path.trim_end_matches('/'));
child_folders.push((subdir, sub_remote_id));
}
let mut upload_jobs = Vec::new();
for file_path in files {
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
if skip_if_exists {
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
if let Ok(meta) = std::fs::metadata(&file_path) {
if rf.size == meta.len() {
println!("{}{}", folder_detail.path, filename);
continue;
}
}
}
}
upload_jobs.push(UploadJob {
folder_id: remote_id.clone(),
file_path: file_path.to_string_lossy().to_string(),
remote_path: format!("{}{}", folder_detail.path, filename),
});
}
Ok(UploadFolderTaskResult {
child_folders,
upload_jobs,
})
}
async fn drive_upload_tasks(
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
errors: &mut Vec<String>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
skip_if_exists: bool,
) {
loop {
match (folder_tasks.is_empty(), upload_tasks.is_empty()) {
(true, true) => break,
(false, true) => {
if let Some(result) = folder_tasks.join_next().await {
handle_upload_folder_result(
result,
folder_tasks,
upload_tasks,
errors,
conn.clone(),
limiter.clone(),
skip_if_exists,
);
}
}
(true, false) => {
if let Some(result) = upload_tasks.join_next().await {
if let Err(err) = flatten_join_result(result) {
errors.push(err.to_string());
}
}
}
(false, false) => {
tokio::select! {
result = folder_tasks.join_next() => {
if let Some(result) = result {
handle_upload_folder_result(
result,
folder_tasks,
upload_tasks,
errors,
conn.clone(),
limiter.clone(),
skip_if_exists,
);
}
}
result = upload_tasks.join_next() => {
if let Some(result) = result {
if let Err(err) = flatten_join_result(result) {
errors.push(err.to_string());
}
}
}
}
}
}
}
}
fn handle_upload_folder_result(
result: Result<Result<UploadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
errors: &mut Vec<String>,
conn: Arc<MDRSConnection>,
limiter: ApiRequestLimiter,
skip_if_exists: bool,
) {
match flatten_join_result(result) {
Ok(task_result) => {
for (local_dir, remote_id) in task_result.child_folders {
spawn_upload_folder_task(
folder_tasks,
conn.clone(),
limiter.clone(),
local_dir,
remote_id,
skip_if_exists,
);
}
for job in task_result.upload_jobs {
spawn_upload_task(upload_tasks, conn.clone(), limiter.clone(), job);
}
}
Err(err) => errors.push(err.to_string()),
}
}
fn flatten_join_result<T>(
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
) -> Result<T, anyhow::Error> {
match result {
Ok(inner) => inner,
Err(err) => Err(anyhow!("Task join failed: {}", err)),
}
}
+1 -23
View File
@@ -1,27 +1,5 @@
use serde::Deserialize;
use std::fs;
#[derive(Deserialize)]
struct CacheUser {
username: String,
}
#[derive(Deserialize)]
struct WhoamiCache {
user: Option<CacheUser>,
}
pub async fn whoami(remote: &str) -> Result<(), anyhow::Error> {
let cache_path = crate::settings::SETTINGS
.config_dirname
.join("cache")
.join(format!("{}.json", remote));
if !cache_path.exists() {
println!("(Anonymous)");
return Ok(());
}
let data = fs::read_to_string(&cache_path)?;
match serde_json::from_str::<WhoamiCache>(&data) {
match crate::cache::load_cache(remote) {
Ok(cache) => match cache.user {
Some(user) => println!("{}", user.username),
None => println!("(Anonymous)"),
+23
View File
@@ -1,6 +1,8 @@
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
use reqwest::{Client, Response};
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
fn build_user_agent() -> String {
let info = os_info::get();
@@ -36,6 +38,27 @@ pub struct MDRSConnection {
pub token: Option<String>,
}
#[derive(Clone)]
pub struct ApiRequestLimiter {
semaphore: Arc<Semaphore>,
}
impl ApiRequestLimiter {
pub fn new(limit: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(limit.max(1))),
}
}
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, anyhow::Error> {
self.semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| anyhow::anyhow!("API request limiter was closed."))
}
}
impl MDRSConnection {
pub fn new(url: &str) -> Self {
MDRSConnection {
+18
View File
@@ -0,0 +1,18 @@
use serde::{Deserialize, Serialize};
/// Nested folder information returned inside a DOI response.
/// The DOI endpoint only returns the folder `id`; `laboratory_id` must be
/// obtained by subsequently calling the folder retrieve endpoint.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DoiFolderRef {
pub id: String,
}
/// Response from GET v3/doi/{id}/
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DoiResponse {
/// The internal DOI suffix ID (e.g. "20260429-001") returned as a string.
pub id: String,
pub doi: String,
pub folder: DoiFolderRef,
}
+1
View File
@@ -1,5 +1,6 @@
// Model definitions (User, Laboratory, File, Folder, etc.)
pub mod doi;
pub mod file;
pub mod folder;
pub mod laboratory;
+1 -1
View File
@@ -4,7 +4,7 @@ pub struct Settings {
/// Base directory for config and cache files.
/// Controlled by `MDRS_CLIENT_CONFIG_DIRNAME` env var (default: `~/.mdrs-client`).
pub config_dirname: std::path::PathBuf,
/// Maximum number of concurrent upload/download workers.
/// Maximum number of concurrent MDRS API requests used by upload/download.
/// Controlled by `MDRS_CLIENT_CONCURRENT` env var (default: 10).
pub concurrent: usize,
}