Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
afd08f2499
|
|||
|
777c5f6533
|
|||
|
80b6560030
|
|||
|
beee9b4f41
|
|||
|
7db5c4d53f
|
|||
|
e3cd864a0c
|
|||
|
32149109b4
|
|||
|
3f2ca938bd
|
|||
|
4e73766732
|
Generated
+236
-397
File diff suppressed because it is too large
Load Diff
+10
-10
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mdrs-client-rust"
|
||||
version = "0.1.1"
|
||||
version = "2.0.1"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
authors = ["Neuroinformatics Unit, RIKEN CBS"]
|
||||
@@ -14,23 +14,23 @@ path = "src/main.rs"
|
||||
clap = { version = "4.5", features = ["derive"] }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "multipart", "rustls-tls"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
tokio = { version = "1.37", features = ["full"] }
|
||||
serde_json = "1.0.150"
|
||||
tokio = { version = "1.52.3", features = ["full"] }
|
||||
futures = "0.3"
|
||||
dirs = "5.0"
|
||||
dirs = "6.0.0"
|
||||
anyhow = "1.0.102"
|
||||
configparser = "3.1.0"
|
||||
configparser = "3.2.0"
|
||||
validators = "0.25.3"
|
||||
sha2 = "0.10"
|
||||
rpassword = "7.0"
|
||||
sha2 = "0.11.0"
|
||||
rpassword = "7.5.4"
|
||||
base64 = "0.22"
|
||||
fs2 = "0.4"
|
||||
ctrlc = "3"
|
||||
os_info = "3"
|
||||
os_info = "3.15.0"
|
||||
dotenvy = "0.15"
|
||||
unicode-normalization = "0.1"
|
||||
self-replace = "1"
|
||||
tar = "0.4"
|
||||
tar = "0.4.46"
|
||||
flate2 = "1"
|
||||
zip = "2"
|
||||
zip = "8.6.0"
|
||||
tempfile = "3"
|
||||
|
||||
@@ -103,7 +103,7 @@ mdrs labs neurodata:
|
||||
|
||||
### ls
|
||||
|
||||
List the contents of a remote folder.
|
||||
List the contents of a remote folder. You can also specify a DOI path in the form `remote:10.xxxx/yyy.ID[/optional/subpath]`.
|
||||
|
||||
```shell
|
||||
mdrs ls neurodata:/NIU/Repository/
|
||||
@@ -111,6 +111,10 @@ mdrs ls -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/
|
||||
mdrs ls -r neurodata:/NIU/Repository/Dataset1/
|
||||
mdrs ls -J -r neurodata:/NIU/Repository/Dataset1/
|
||||
mdrs ls -q neurodata:/NIU/Repository/
|
||||
|
||||
# DOI access examples:
|
||||
mdrs ls neurodata:10.60178/cbs.20260429-001
|
||||
mdrs ls "neurodata:10.60178/cbs.20260429-001/Figure 1"
|
||||
```
|
||||
|
||||
### mkdir
|
||||
@@ -133,7 +137,7 @@ mdrs upload -r --skip-if-exists ./dataset neurodata:/NIU/Repository/TEST/
|
||||
|
||||
### download
|
||||
|
||||
Download a file or folder from a remote path.
|
||||
Download a file or folder from a remote path. You can also specify a DOI path.
|
||||
|
||||
```shell
|
||||
mdrs download neurodata:/NIU/Repository/TEST/sample.dat ./
|
||||
@@ -141,6 +145,9 @@ mdrs download -r neurodata:/NIU/Repository/TEST/dataset/ ./
|
||||
mdrs download -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/Readme.dat ./
|
||||
mdrs download -r --exclude /NIU/Repository/TEST/dataset/skip neurodata:/NIU/Repository/TEST/dataset/ ./
|
||||
mdrs download -r --skip-if-exists neurodata:/NIU/Repository/TEST/dataset/ ./
|
||||
|
||||
# DOI access examples:
|
||||
mdrs download -r neurodata:10.60178/cbs.20260429-001 ./
|
||||
```
|
||||
|
||||
### mv
|
||||
@@ -184,20 +191,26 @@ Available access levels: `private`, `public`, `pw_open`, `cbs_open`, `5kikan_ope
|
||||
|
||||
### metadata
|
||||
|
||||
Show metadata for a remote folder.
|
||||
Show metadata for a remote folder. You can also specify a DOI path.
|
||||
|
||||
```shell
|
||||
mdrs metadata neurodata:/NIU/Repository/TEST/
|
||||
mdrs metadata neurodata:/NIU/Repository/Private/
|
||||
mdrs metadata -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/
|
||||
|
||||
# DOI access examples:
|
||||
mdrs metadata neurodata:10.60178/cbs.20260429-001
|
||||
```
|
||||
|
||||
### file-metadata
|
||||
|
||||
Show metadata for a remote file.
|
||||
Show metadata for a remote file. You can also specify a DOI path.
|
||||
|
||||
```shell
|
||||
mdrs file-metadata neurodata:/NIU/Repository/TEST/dataset/sample.dat
|
||||
mdrs file-metadata -p SHARING_PASSWORD neurodata:/NIU/Repository/PW_Open/Readme.txt
|
||||
|
||||
# DOI access examples:
|
||||
mdrs file-metadata "neurodata:10.60178/cbs.20260429-001/Figure 1/Figure1v3.pdf"
|
||||
```
|
||||
|
||||
### version
|
||||
@@ -208,6 +221,16 @@ Show the tool name and version number.
|
||||
mdrs version
|
||||
```
|
||||
|
||||
### selfupdate
|
||||
|
||||
Update the current `mdrs` binary to the latest published release for
|
||||
the same build target.
|
||||
|
||||
```shell
|
||||
mdrs selfupdate
|
||||
mdrs selfupdate -y
|
||||
```
|
||||
|
||||
### help
|
||||
|
||||
Show help for a command.
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::doi::DoiResponse;
|
||||
use anyhow::bail;
|
||||
|
||||
impl MDRSConnection {
|
||||
/// Retrieve the folder associated with a DOI suffix ID (GET v3/doi/{id}/).
|
||||
///
|
||||
/// The MDRS DOI format is `10.xxxx/prefix.{id}` where the suffix after the
|
||||
/// last `.` is the internal system ID passed to this endpoint.
|
||||
pub async fn retrieve_doi(&self, id: &str) -> Result<DoiResponse, anyhow::Error> {
|
||||
let resp = self.get(&format!("v3/doi/{}/", id)).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("DOI lookup failed: {}", resp.status());
|
||||
}
|
||||
Ok(resp.json::<DoiResponse>().await?)
|
||||
}
|
||||
}
|
||||
+48
-5
@@ -1,4 +1,4 @@
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
pub use crate::models::file::File;
|
||||
use anyhow::bail;
|
||||
use unicode_normalization::UnicodeNormalization;
|
||||
@@ -34,8 +34,43 @@ impl MDRSConnection {
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
/// Upload a local file into the given remote folder.
|
||||
pub async fn upload_file(&self, folder_id: &str, file_path: &str) -> Result<(), anyhow::Error> {
|
||||
/// List all files in a folder while consuming the shared API concurrency budget.
|
||||
pub async fn list_all_files_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<Vec<File>, anyhow::Error> {
|
||||
let mut all_files = Vec::new();
|
||||
let mut page: u32 = 1;
|
||||
loop {
|
||||
let params = [
|
||||
("folder_id", folder_id.to_string()),
|
||||
("page", page.to_string()),
|
||||
];
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_with_query("v3/files/", ¶ms).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("List files failed: {}", resp.status());
|
||||
}
|
||||
let list: FileListResponse = resp.json().await?;
|
||||
let has_next = list.next.is_some();
|
||||
all_files.extend(list.results);
|
||||
if !has_next {
|
||||
break;
|
||||
}
|
||||
page += 1;
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
/// Upload a local file into the given remote folder while consuming the
|
||||
/// shared API concurrency budget.
|
||||
pub async fn upload_file_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
file_path: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
use anyhow::{anyhow, bail};
|
||||
use reqwest::multipart;
|
||||
let file_name: String = std::path::Path::new(file_path)
|
||||
@@ -49,6 +84,7 @@ impl MDRSConnection {
|
||||
let form = multipart::Form::new()
|
||||
.text("folder_id", folder_id.to_string())
|
||||
.part("file", part);
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.post_multipart("v3/files/", form).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Upload failed: {}", resp.status());
|
||||
@@ -56,13 +92,20 @@ impl MDRSConnection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Download a file from `url` and write it to `dest`.
|
||||
pub async fn download_file(&self, url: &str, dest: &str) -> Result<(), anyhow::Error> {
|
||||
/// Download a file while consuming the shared API concurrency budget.
|
||||
pub async fn download_file_limited(
|
||||
&self,
|
||||
url: &str,
|
||||
dest: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_url(url).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Download failed: {}", resp.status());
|
||||
}
|
||||
let bytes = resp.bytes().await?;
|
||||
drop(_permit);
|
||||
tokio::fs::write(dest, &bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+85
-2
@@ -1,6 +1,6 @@
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
pub use crate::models::folder::{FolderDetail, FolderSimple};
|
||||
use anyhow::bail;
|
||||
use anyhow::{anyhow, bail};
|
||||
|
||||
impl MDRSConnection {
|
||||
/// List folders matching the given path under a laboratory (GET v3/folders/?path=...&laboratory_id=...)
|
||||
@@ -20,6 +20,25 @@ impl MDRSConnection {
|
||||
Ok(resp.json::<Vec<FolderSimple>>().await?)
|
||||
}
|
||||
|
||||
/// List folders by path while consuming the shared API concurrency budget.
|
||||
pub async fn list_folders_by_path_limited(
|
||||
&self,
|
||||
lab_id: u32,
|
||||
path: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<Vec<FolderSimple>, anyhow::Error> {
|
||||
let params = [
|
||||
("laboratory_id", lab_id.to_string()),
|
||||
("path", path.to_string()),
|
||||
];
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get_with_query("v3/folders/", ¶ms).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("List folders failed: {}", resp.status());
|
||||
}
|
||||
Ok(resp.json::<Vec<FolderSimple>>().await?)
|
||||
}
|
||||
|
||||
/// Retrieve full folder details including sub_folders (GET v3/folders/{id}/)
|
||||
pub async fn retrieve_folder(&self, id: &str) -> Result<FolderDetail, anyhow::Error> {
|
||||
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
|
||||
@@ -29,6 +48,20 @@ impl MDRSConnection {
|
||||
Ok(resp.json::<FolderDetail>().await?)
|
||||
}
|
||||
|
||||
/// Retrieve folder details while consuming the shared API concurrency budget.
|
||||
pub async fn retrieve_folder_limited(
|
||||
&self,
|
||||
id: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<FolderDetail, anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.get(&format!("v3/folders/{}/", id)).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Retrieve folder failed: {}", resp.status());
|
||||
}
|
||||
Ok(resp.json::<FolderDetail>().await?)
|
||||
}
|
||||
|
||||
/// Create a new folder under `parent_id` (POST v3/folders/).
|
||||
pub async fn create_folder(
|
||||
&self,
|
||||
@@ -44,6 +77,32 @@ impl MDRSConnection {
|
||||
self.post_json("v3/folders/", &body).await
|
||||
}
|
||||
|
||||
/// Create a new folder under `parent_id` and return its ID while consuming
|
||||
/// the shared API concurrency budget.
|
||||
pub async fn create_folder_id_limited(
|
||||
&self,
|
||||
parent_id: &str,
|
||||
folder_name: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
let body = serde_json::json!({
|
||||
"name": folder_name,
|
||||
"parent_id": parent_id,
|
||||
"description": "",
|
||||
"template_id": -1,
|
||||
});
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self.post_json("v3/folders/", &body).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Failed to create remote folder: {}", folder_name);
|
||||
}
|
||||
let json: serde_json::Value = resp.json().await?;
|
||||
json["id"]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow!("No id in create_folder response for {}", folder_name))
|
||||
.map(|s| s.to_string())
|
||||
}
|
||||
|
||||
/// Authenticate against a password-locked folder (POST v3/folders/{id}/auth/).
|
||||
/// Returns `Err` if the password is incorrect or the request fails.
|
||||
pub async fn folder_auth(&self, folder_id: &str, password: &str) -> Result<(), anyhow::Error> {
|
||||
@@ -61,4 +120,28 @@ impl MDRSConnection {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Authenticate against a locked folder while consuming the shared API
|
||||
/// concurrency budget.
|
||||
pub async fn folder_auth_limited(
|
||||
&self,
|
||||
folder_id: &str,
|
||||
password: &str,
|
||||
limiter: &ApiRequestLimiter,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let _permit = limiter.acquire().await?;
|
||||
let resp = self
|
||||
.post_json(
|
||||
&format!("v3/folders/{}/auth/", folder_id),
|
||||
&serde_json::json!({"password": password}),
|
||||
)
|
||||
.await?;
|
||||
if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
|
||||
bail!("Password is incorrect.");
|
||||
}
|
||||
if !resp.status().is_success() {
|
||||
bail!("Folder auth failed: {}", resp.status());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// API module (add users, files, folders, laboratories, etc. here)
|
||||
|
||||
pub mod doi;
|
||||
pub mod files;
|
||||
pub mod folders;
|
||||
pub mod laboratories;
|
||||
|
||||
Vendored
+2
-1
@@ -119,5 +119,6 @@ pub fn compute_digest(
|
||||
let json_str = python_digest_json(user, access, refresh, labs);
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(json_str.as_bytes());
|
||||
format!("{:x}", hasher.finalize())
|
||||
let result = hasher.finalize();
|
||||
result.iter().map(|b| format!("{:02x}", b)).collect()
|
||||
}
|
||||
|
||||
Vendored
+494
-62
@@ -2,13 +2,17 @@ pub mod digest;
|
||||
pub mod types;
|
||||
|
||||
pub use digest::compute_digest;
|
||||
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheUser};
|
||||
pub use types::{Cache, CacheLaboratory, CacheLabsWrapper, CacheToken, CacheUser};
|
||||
|
||||
use crate::connection::MDRSConnection;
|
||||
use anyhow::{anyhow, bail};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Per-remote async mutex map (in-process serialization)
|
||||
@@ -17,6 +21,29 @@ use std::sync::{Arc, LazyLock, Mutex};
|
||||
static REMOTE_LOCKS: LazyLock<Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>> =
|
||||
LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
struct CacheStoreKey {
|
||||
config_dir: PathBuf,
|
||||
remote: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
struct CacheFileSnapshot {
|
||||
len: u64,
|
||||
modified_nanos: u128,
|
||||
#[cfg(unix)]
|
||||
inode: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MemoryCacheEntry {
|
||||
snapshot: CacheFileSnapshot,
|
||||
cache: Cache,
|
||||
}
|
||||
|
||||
static MEMORY_CACHE: LazyLock<Mutex<HashMap<CacheStoreKey, MemoryCacheEntry>>> =
|
||||
LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
|
||||
let mut map = REMOTE_LOCKS.lock().unwrap();
|
||||
map.entry(remote.to_string())
|
||||
@@ -28,11 +55,198 @@ fn get_remote_lock(remote: &str) -> Arc<tokio::sync::Mutex<()>> {
|
||||
// Cache file path helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn cache_file_path(remote: &str) -> std::path::PathBuf {
|
||||
crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote))
|
||||
fn cache_store_key(config_dir: &Path, remote: &str) -> CacheStoreKey {
|
||||
CacheStoreKey {
|
||||
config_dir: config_dir.to_path_buf(),
|
||||
remote: remote.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn cache_dir_path(config_dir: &Path) -> PathBuf {
|
||||
config_dir.join("cache")
|
||||
}
|
||||
|
||||
fn cache_file_path_in(config_dir: &Path, remote: &str) -> PathBuf {
|
||||
cache_dir_path(config_dir).join(format!("{}.json", remote))
|
||||
}
|
||||
|
||||
fn cache_file_path(remote: &str) -> PathBuf {
|
||||
cache_file_path_in(&crate::settings::SETTINGS.config_dirname, remote)
|
||||
}
|
||||
|
||||
fn cache_snapshot(metadata: &fs::Metadata) -> CacheFileSnapshot {
|
||||
let modified_nanos = metadata
|
||||
.modified()
|
||||
.ok()
|
||||
.and_then(|time| time.duration_since(UNIX_EPOCH).ok())
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or_default();
|
||||
|
||||
CacheFileSnapshot {
|
||||
len: metadata.len(),
|
||||
modified_nanos,
|
||||
#[cfg(unix)]
|
||||
inode: metadata.ino(),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_cache_snapshot(cache_path: &Path) -> Result<CacheFileSnapshot, std::io::Error> {
|
||||
fs::metadata(cache_path).map(|metadata| cache_snapshot(&metadata))
|
||||
}
|
||||
|
||||
fn cached_entry(config_dir: &Path, remote: &str, snapshot: &CacheFileSnapshot) -> Option<Cache> {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let map = MEMORY_CACHE.lock().unwrap();
|
||||
map.get(&key)
|
||||
.filter(|entry| entry.snapshot == *snapshot)
|
||||
.map(|entry| entry.cache.clone())
|
||||
}
|
||||
|
||||
fn update_cached_entry(config_dir: &Path, remote: &str, snapshot: CacheFileSnapshot, cache: Cache) {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let mut map = MEMORY_CACHE.lock().unwrap();
|
||||
map.insert(key, MemoryCacheEntry { snapshot, cache });
|
||||
}
|
||||
|
||||
fn invalidate_cached_entry(config_dir: &Path, remote: &str) {
|
||||
let key = cache_store_key(config_dir, remote);
|
||||
let mut map = MEMORY_CACHE.lock().unwrap();
|
||||
map.remove(&key);
|
||||
}
|
||||
|
||||
fn ensure_cache_dir(cache_dir: &Path) -> Result<(), anyhow::Error> {
|
||||
fs::create_dir_all(cache_dir)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(cache_dir)?.permissions();
|
||||
perms.set_mode(0o700);
|
||||
fs::set_permissions(cache_dir, perms)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_cache_file(cache_path: &Path, cache: &Cache) -> Result<(), anyhow::Error> {
|
||||
let tmp_path = cache_path.with_extension("tmp");
|
||||
fs::write(&tmp_path, serde_json::to_vec_pretty(cache)?)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&tmp_path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp_path, perms)?;
|
||||
}
|
||||
fs::rename(&tmp_path, cache_path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_cache(remote: &str, data: &str) -> Result<Cache, anyhow::Error> {
|
||||
serde_json::from_str::<Cache>(data).map_err(|e| {
|
||||
anyhow!(
|
||||
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
|
||||
remote,
|
||||
e,
|
||||
remote
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn load_cache_from_dir(remote: &str, config_dir: &Path) -> Result<Cache, anyhow::Error> {
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
let snapshot = match read_cache_snapshot(&cache_path) {
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if let Some(cache) = cached_entry(config_dir, remote, &snapshot) {
|
||||
return Ok(cache);
|
||||
}
|
||||
|
||||
let data = match fs::read_to_string(&cache_path) {
|
||||
Ok(data) => data,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let cache = parse_cache(remote, &data)?;
|
||||
update_cached_entry(config_dir, remote, snapshot, cache.clone());
|
||||
Ok(cache)
|
||||
}
|
||||
|
||||
fn load_cache_if_present_from_dir(
|
||||
remote: &str,
|
||||
config_dir: &Path,
|
||||
) -> Result<Option<Cache>, anyhow::Error> {
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
let snapshot = match read_cache_snapshot(&cache_path) {
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
return Ok(None);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if let Some(cache) = cached_entry(config_dir, remote, &snapshot) {
|
||||
return Ok(Some(cache));
|
||||
}
|
||||
|
||||
let data = match fs::read_to_string(&cache_path) {
|
||||
Ok(data) => data,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
return Ok(None);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let cache = match parse_cache(remote, &data) {
|
||||
Ok(cache) => cache,
|
||||
Err(_) => {
|
||||
remove_cache_in_dir(remote, config_dir)?;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
update_cached_entry(config_dir, remote, snapshot, cache.clone());
|
||||
Ok(Some(cache))
|
||||
}
|
||||
|
||||
fn persist_cache_in_dir(
|
||||
remote: &str,
|
||||
config_dir: &Path,
|
||||
cache: &Cache,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let cache_dir = cache_dir_path(config_dir);
|
||||
ensure_cache_dir(&cache_dir)?;
|
||||
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
write_cache_file(&cache_path, cache)?;
|
||||
|
||||
let snapshot = read_cache_snapshot(&cache_path)?;
|
||||
update_cached_entry(config_dir, remote, snapshot, cache.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_cache_in_dir(remote: &str, config_dir: &Path) -> Result<(), anyhow::Error> {
|
||||
let cache_path = cache_file_path_in(config_dir, remote);
|
||||
match fs::remove_file(&cache_path) {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
invalidate_cached_entry(config_dir, remote);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -41,23 +255,17 @@ fn cache_file_path(remote: &str) -> std::path::PathBuf {
|
||||
|
||||
/// Load token and laboratories from the login cache file (no token refresh check).
|
||||
pub fn load_cache(remote: &str) -> Result<Cache, anyhow::Error> {
|
||||
let cache_path = cache_file_path(remote);
|
||||
if !cache_path.exists() {
|
||||
bail!(
|
||||
"Not logged in to `{}`. Run `mdrs login {}` first.",
|
||||
remote,
|
||||
remote
|
||||
);
|
||||
}
|
||||
let data = fs::read_to_string(&cache_path)?;
|
||||
serde_json::from_str::<Cache>(&data).map_err(|e| {
|
||||
anyhow!(
|
||||
"Cache for `{}` is invalid or outdated ({}). Run `mdrs login {}` to refresh it.",
|
||||
remote,
|
||||
e,
|
||||
remote
|
||||
)
|
||||
})
|
||||
load_cache_from_dir(remote, &crate::settings::SETTINGS.config_dirname)
|
||||
}
|
||||
|
||||
/// Persist a cache entry and refresh the in-memory fast path.
|
||||
pub fn persist_cache(remote: &str, cache: &Cache) -> Result<(), anyhow::Error> {
|
||||
persist_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname, cache)
|
||||
}
|
||||
|
||||
/// Remove a cache entry from disk and memory.
|
||||
pub fn remove_cache(remote: &str) -> Result<(), anyhow::Error> {
|
||||
remove_cache_in_dir(remote, &crate::settings::SETTINGS.config_dirname)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -76,6 +284,7 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
|
||||
let lock = get_remote_lock(remote);
|
||||
let _guard = lock.lock().await;
|
||||
|
||||
ensure_cache_dir(&cache_dir_path(&crate::settings::SETTINGS.config_dirname))?;
|
||||
let lock_path = cache_file_path(remote).with_extension("lock");
|
||||
use fs2::FileExt;
|
||||
let lock_file = fs::OpenOptions::new()
|
||||
@@ -98,8 +307,7 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
|
||||
}
|
||||
|
||||
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
|
||||
let new_access = refresh_and_persist(remote, &cache).await?;
|
||||
cache.token.access = new_access;
|
||||
cache = refresh_and_persist(remote, &cache).await?;
|
||||
}
|
||||
|
||||
Ok(cache)
|
||||
@@ -110,51 +318,89 @@ pub async fn load_cache_with_token_refresh(remote: &str) -> Result<Cache, anyhow
|
||||
result
|
||||
}
|
||||
|
||||
async fn load_cache_with_token_refresh_optional_from_dir(
|
||||
remote: &str,
|
||||
config_dir: &Path,
|
||||
) -> Result<Option<Cache>, anyhow::Error> {
|
||||
let lock = get_remote_lock(remote);
|
||||
let _guard = lock.lock().await;
|
||||
|
||||
ensure_cache_dir(&cache_dir_path(config_dir))?;
|
||||
let lock_path = cache_file_path_in(config_dir, remote).with_extension("lock");
|
||||
use fs2::FileExt;
|
||||
let lock_file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&lock_path)?;
|
||||
lock_file.lock_exclusive()?;
|
||||
|
||||
let result: Result<Option<Cache>, anyhow::Error> = async {
|
||||
let Some(mut cache) = load_cache_if_present_from_dir(remote, config_dir)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if crate::token::is_expired(&cache.token.refresh) {
|
||||
remove_cache_in_dir(remote, config_dir)?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if crate::token::is_refresh_required(&cache.token.access, &cache.token.refresh) {
|
||||
cache = refresh_and_persist_in_dir(remote, config_dir, &cache).await?;
|
||||
}
|
||||
|
||||
Ok(Some(cache))
|
||||
}
|
||||
.await;
|
||||
|
||||
lock_file.unlock()?;
|
||||
result
|
||||
}
|
||||
|
||||
/// Load cache when present and refresh its token if needed.
|
||||
///
|
||||
/// Unlike `load_cache_with_token_refresh`, this returns `Ok(None)` when the user
|
||||
/// is effectively anonymous: no cache file exists, the cache is invalid, or the
|
||||
/// refresh token has already expired. This mirrors the Python client behavior
|
||||
/// used by read-only commands.
|
||||
pub async fn load_cache_with_token_refresh_optional(
|
||||
remote: &str,
|
||||
) -> Result<Option<Cache>, anyhow::Error> {
|
||||
load_cache_with_token_refresh_optional_from_dir(
|
||||
remote,
|
||||
&crate::settings::SETTINGS.config_dirname,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Call the token-refresh endpoint and write the new access token back to the
|
||||
/// cache file. The caller must already hold the per-remote async mutex.
|
||||
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<String, anyhow::Error> {
|
||||
async fn refresh_and_persist(remote: &str, cache: &Cache) -> Result<Cache, anyhow::Error> {
|
||||
refresh_and_persist_in_dir(remote, &crate::settings::SETTINGS.config_dirname, cache).await
|
||||
}
|
||||
|
||||
async fn refresh_and_persist_in_dir(
|
||||
remote: &str,
|
||||
config_dir: &Path,
|
||||
cache: &Cache,
|
||||
) -> Result<Cache, anyhow::Error> {
|
||||
let url = crate::commands::config::get_remote_url(remote)?
|
||||
.ok_or_else(|| anyhow!("Remote `{}` is not configured.", remote))?;
|
||||
let conn = MDRSConnection::new(&url);
|
||||
|
||||
let new_access = conn.token_refresh(&cache.token.refresh).await?;
|
||||
|
||||
let new_digest = compute_digest(
|
||||
cache.user.as_ref(),
|
||||
&new_access,
|
||||
&cache.token.refresh,
|
||||
&cache.laboratories,
|
||||
let mut updated_cache = cache.clone();
|
||||
updated_cache.token.access = new_access;
|
||||
updated_cache.digest = compute_digest(
|
||||
updated_cache.user.as_ref(),
|
||||
&updated_cache.token.access,
|
||||
&updated_cache.token.refresh,
|
||||
&updated_cache.laboratories,
|
||||
);
|
||||
|
||||
let cache_path = cache_file_path(remote);
|
||||
let raw = fs::read_to_string(&cache_path)?;
|
||||
let mut obj: serde_json::Value = serde_json::from_str(&raw)?;
|
||||
persist_cache_in_dir(remote, config_dir, &updated_cache)?;
|
||||
|
||||
obj["token"]["access"] = serde_json::Value::String(new_access.clone());
|
||||
obj["digest"] = serde_json::Value::String(new_digest);
|
||||
|
||||
// Write atomically: write to .tmp then rename.
|
||||
let tmp_path = cache_path.with_extension("tmp");
|
||||
{
|
||||
use std::io::Write;
|
||||
let mut tmp_file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&tmp_path)?;
|
||||
tmp_file.write_all(serde_json::to_string(&obj)?.as_bytes())?;
|
||||
tmp_file.flush()?;
|
||||
}
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let mut perms = fs::metadata(&tmp_path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp_path, perms)?;
|
||||
}
|
||||
fs::rename(&tmp_path, &cache_path)?;
|
||||
|
||||
Ok(new_access)
|
||||
Ok(updated_cache)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -166,9 +412,195 @@ pub fn create_authenticated_conn(
|
||||
remote: &str,
|
||||
cache: &Cache,
|
||||
) -> Result<MDRSConnection, anyhow::Error> {
|
||||
Ok(create_remote_conn(remote)?.with_token(cache.token.access.clone()))
|
||||
}
|
||||
|
||||
/// Create an unauthenticated `MDRSConnection` for the given remote label.
|
||||
pub fn create_remote_conn(remote: &str) -> Result<MDRSConnection, anyhow::Error> {
|
||||
let url = crate::commands::config::get_remote_url(remote)?
|
||||
.ok_or_else(|| anyhow!("Remote `{}` is not configured.", remote))?;
|
||||
Ok(MDRSConnection::new(&url)
|
||||
.with_remote(remote)
|
||||
.with_token(cache.token.access.clone()))
|
||||
Ok(MDRSConnection::new(&url).with_remote(remote))
|
||||
}
|
||||
|
||||
/// Create a connection for read-only commands, attaching a bearer token only
|
||||
/// when a valid login cache is available.
|
||||
pub async fn create_readonly_conn(
|
||||
remote: &str,
|
||||
) -> Result<(MDRSConnection, Option<Cache>), anyhow::Error> {
|
||||
let conn = create_remote_conn(remote)?;
|
||||
match load_cache_with_token_refresh_optional(remote).await? {
|
||||
Some(cache) => Ok((conn.with_token(cache.token.access.clone()), Some(cache))),
|
||||
None => Ok((conn, None)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn sample_cache(username: &str) -> Cache {
|
||||
Cache {
|
||||
user: Some(CacheUser {
|
||||
id: 1,
|
||||
username: username.to_string(),
|
||||
laboratory_ids: vec![10, 20],
|
||||
is_reviewer: false,
|
||||
}),
|
||||
token: types::CacheToken {
|
||||
access: format!("access-{username}"),
|
||||
refresh: format!("refresh-{username}"),
|
||||
},
|
||||
laboratories: CacheLabsWrapper {
|
||||
items: vec![CacheLaboratory {
|
||||
id: 10,
|
||||
name: "lab".to_string(),
|
||||
pi_name: String::new(),
|
||||
full_name: "Laboratory".to_string(),
|
||||
}],
|
||||
},
|
||||
digest: format!("digest-{username}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_name(prefix: &str, config_dir: &Path) -> String {
|
||||
format!(
|
||||
"{prefix}-{}",
|
||||
config_dir
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.to_string_lossy()
|
||||
.replace('.', "_")
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn load_cache_uses_memory_fast_path_when_snapshot_matches() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("fast-path", dir.path());
|
||||
let cache = sample_cache("alice");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
|
||||
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(first.user.unwrap().username, "alice");
|
||||
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
|
||||
perms.set_mode(0o000);
|
||||
fs::set_permissions(&cache_path, perms).unwrap();
|
||||
|
||||
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(second.user.unwrap().username, "alice");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_cache_reloads_when_external_writer_changes_file() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("reload", dir.path());
|
||||
let original = sample_cache("alice");
|
||||
let updated = sample_cache("bob");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
|
||||
let first = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(first.user.unwrap().username, "alice");
|
||||
|
||||
let cache_dir = cache_dir_path(dir.path());
|
||||
ensure_cache_dir(&cache_dir).unwrap();
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
write_cache_file(&cache_path, &updated).unwrap();
|
||||
|
||||
let second = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(second.user.unwrap().username, "bob");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn persist_cache_refreshes_memory_entry() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("persist", dir.path());
|
||||
let original = sample_cache("alice");
|
||||
let updated = sample_cache("bob");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &original).unwrap();
|
||||
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &updated).unwrap();
|
||||
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
let mut perms = fs::metadata(&cache_path).unwrap().permissions();
|
||||
perms.set_mode(0o000);
|
||||
fs::set_permissions(&cache_path, perms).unwrap();
|
||||
|
||||
let loaded = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
assert_eq!(loaded.user.unwrap().username, "bob");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_cache_invalidates_memory_entry() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("remove", dir.path());
|
||||
let cache = sample_cache("alice");
|
||||
|
||||
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
|
||||
let _ = load_cache_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
remove_cache_in_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
let err = load_cache_from_dir(&remote, dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains(&format!("Not logged in to `{remote}`"))
|
||||
);
|
||||
}
|
||||
|
||||
fn make_jwt_with_exp(exp: i64) -> String {
|
||||
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
|
||||
|
||||
let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
|
||||
let payload = URL_SAFE_NO_PAD.encode(format!(r#"{{"exp":{exp}}}"#));
|
||||
format!("{header}.{payload}.")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_cache_if_present_returns_none_when_cache_missing() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("missing", dir.path());
|
||||
|
||||
let loaded = load_cache_if_present_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
assert!(loaded.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_cache_if_present_clears_invalid_cache() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("invalid", dir.path());
|
||||
let cache_dir = cache_dir_path(dir.path());
|
||||
ensure_cache_dir(&cache_dir).unwrap();
|
||||
let cache_path = cache_file_path_in(dir.path(), &remote);
|
||||
fs::write(&cache_path, b"{invalid json").unwrap();
|
||||
|
||||
let loaded = load_cache_if_present_from_dir(&remote, dir.path()).unwrap();
|
||||
|
||||
assert!(loaded.is_none());
|
||||
assert!(!cache_path.exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn optional_cache_load_treats_expired_session_as_anonymous() {
|
||||
let dir = tempdir().unwrap();
|
||||
let remote = remote_name("expired", dir.path());
|
||||
let mut cache = sample_cache("alice");
|
||||
cache.token.access = make_jwt_with_exp(0);
|
||||
cache.token.refresh = make_jwt_with_exp(0);
|
||||
persist_cache_in_dir(&remote, dir.path(), &cache).unwrap();
|
||||
|
||||
let loaded = load_cache_with_token_refresh_optional_from_dir(&remote, dir.path())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(loaded.is_none());
|
||||
assert!(!cache_file_path_in(dir.path(), &remote).exists());
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
+8
-6
@@ -1,14 +1,14 @@
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Access and refresh token pair stored in the login cache file.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheToken {
|
||||
pub access: String,
|
||||
pub refresh: String,
|
||||
}
|
||||
|
||||
/// Minimal user fields stored in the cache, matching Python's `User` dataclass.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheUser {
|
||||
pub id: u32,
|
||||
pub username: String,
|
||||
@@ -17,7 +17,7 @@ pub struct CacheUser {
|
||||
}
|
||||
|
||||
/// All four laboratory fields needed for digest computation.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct CacheLaboratory {
|
||||
pub id: u32,
|
||||
pub name: String,
|
||||
@@ -28,15 +28,17 @@ pub struct CacheLaboratory {
|
||||
}
|
||||
|
||||
/// Wrapper matching Python's `Laboratories` serialization: `{"items": [...]}`.
|
||||
#[derive(Deserialize, Clone, Default)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct CacheLabsWrapper {
|
||||
pub items: Vec<CacheLaboratory>,
|
||||
}
|
||||
|
||||
/// Full login cache, corresponding to the `<remote>.json` file written by `login`.
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct Cache {
|
||||
pub user: Option<CacheUser>,
|
||||
pub token: CacheToken,
|
||||
pub laboratories: CacheLabsWrapper,
|
||||
#[serde(default)]
|
||||
pub digest: String,
|
||||
}
|
||||
|
||||
+501
-75
@@ -1,12 +1,13 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::cache::create_readonly_conn;
|
||||
use crate::commands::shared::{
|
||||
find_file_by_name, find_folder, find_lab_in_cache, find_subfolder_by_name, parse_remote_path,
|
||||
find_file_by_name, find_folder_by_doi, find_folder_limited, find_laboratory,
|
||||
find_subfolder_by_name, is_doi, parse_doi_remote_path, parse_remote_path,
|
||||
};
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
use anyhow::{anyhow, bail};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub async fn download(
|
||||
remote_path: &str,
|
||||
@@ -16,10 +17,7 @@ pub async fn download(
|
||||
password: Option<&str>,
|
||||
excludes: Vec<String>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
|
||||
|
||||
// Validate that local_path is an existing directory (matching Python's behaviour).
|
||||
let local_real = std::fs::canonicalize(local_path)
|
||||
@@ -28,6 +26,197 @@ pub async fn download(
|
||||
bail!("Local directory `{}` not found.", local_path);
|
||||
}
|
||||
|
||||
// Detect DOI path: "remote:10.xxxx/prefix.ID[/optional/sub/path]"
|
||||
if is_doi(remote_path.splitn(2, ':').nth(1).unwrap_or("")) {
|
||||
let (remote, doi, subpath) = parse_doi_remote_path(remote_path)?;
|
||||
let (raw_conn, _cache) = create_readonly_conn(&remote).await?;
|
||||
let (doi_folder, lab) = find_folder_by_doi(&raw_conn, &doi, password).await?;
|
||||
let abs_path = format!("{}{}", doi_folder.path.trim_end_matches('/'), subpath);
|
||||
let abs_path_clean = abs_path.trim_end_matches('/');
|
||||
|
||||
// Check if the target is a folder.
|
||||
// If it is, we download the directory. If it is not, we download the file or subfolder.
|
||||
let (folder, is_folder) = match find_folder_limited(
|
||||
&raw_conn,
|
||||
&limiter,
|
||||
lab.id,
|
||||
abs_path_clean,
|
||||
password,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(f) => (f, true),
|
||||
Err(_) => {
|
||||
let (parent_path, _) = match abs_path_clean.rfind('/') {
|
||||
Some(0) => ("/".to_string(), abs_path_clean[1..].to_string()),
|
||||
Some(pos) => (
|
||||
abs_path_clean[..pos].to_string(),
|
||||
abs_path_clean[pos + 1..].to_string(),
|
||||
),
|
||||
None => ("/".to_string(), abs_path_clean.to_string()),
|
||||
};
|
||||
let parent =
|
||||
find_folder_limited(&raw_conn, &limiter, lab.id, &parent_path, password)
|
||||
.await?;
|
||||
(parent, false)
|
||||
}
|
||||
};
|
||||
|
||||
let conn = Arc::new(raw_conn);
|
||||
let excludes = Arc::new(excludes);
|
||||
|
||||
if is_folder {
|
||||
let path = folder.path.clone();
|
||||
let lab_name = Arc::new(lab.name);
|
||||
let top_local = local_real.join(&folder.name);
|
||||
let password_owned = password.map(str::to_string);
|
||||
|
||||
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
|
||||
if !recursive {
|
||||
// Single-folder, non-recursive: download the files inside the DOI folder.
|
||||
let files = conn.list_all_files_limited(&folder.id, &limiter).await?;
|
||||
for file in &files {
|
||||
if is_excluded(&excludes, lab_name.as_str(), &path, Some(&file.name)) {
|
||||
continue;
|
||||
}
|
||||
let dest = local_real.join(&file.name);
|
||||
if skip_if_exists {
|
||||
if dest.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest) {
|
||||
if meta.len() == file.size {
|
||||
println!("{}", dest.display());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &file.download_url);
|
||||
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}", dest.display());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
spawn_download_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
folder.id.clone(),
|
||||
top_local,
|
||||
password_owned.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
drive_download_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut download_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
password_owned,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Case: target is a file or subfolder inside the resolved folder.
|
||||
let basename = match abs_path_clean.rfind('/') {
|
||||
Some(pos) => abs_path_clean[pos + 1..].to_string(),
|
||||
None => abs_path_clean.to_string(),
|
||||
};
|
||||
let files = conn.list_all_files_limited(&folder.id, &limiter).await?;
|
||||
|
||||
// Case 1: basename matches a file in the folder.
|
||||
if let Some(file) = find_file_by_name(&files, &basename) {
|
||||
if is_excluded(&excludes, &lab.name, &folder.path, Some(&file.name)) {
|
||||
return Ok(());
|
||||
}
|
||||
let dest = local_real.join(&file.name);
|
||||
if skip_if_exists {
|
||||
if dest.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest) {
|
||||
if meta.len() == file.size {
|
||||
println!("{}", dest.display());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &file.download_url);
|
||||
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}", dest.display());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Case 2: basename matches a sub-folder.
|
||||
let subfolder = find_subfolder_by_name(&folder.sub_folders, &basename);
|
||||
if let Some(sub) = subfolder {
|
||||
if !recursive {
|
||||
bail!("Cannot download `{}`: Is a folder.", abs_path_clean);
|
||||
}
|
||||
let top_local = local_real.join(&sub.name);
|
||||
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
let lab_name = Arc::new(lab.name.clone());
|
||||
let password_owned = password.map(str::to_string);
|
||||
|
||||
spawn_download_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
sub.id.clone(),
|
||||
top_local,
|
||||
password_owned.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
drive_download_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut download_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
password_owned,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
bail!("File or folder `{}` not found.", abs_path_clean);
|
||||
}
|
||||
|
||||
// Normal path: "remote:/labname/path/..."
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let (raw_conn, cache) = create_readonly_conn(&remote).await?;
|
||||
let conn = Arc::new(raw_conn);
|
||||
let lab = find_laboratory(&conn, cache.as_ref(), &labname).await?;
|
||||
|
||||
// Split r_path into the parent directory path and the target basename.
|
||||
// Trailing slashes are already stripped by parse_remote_path, so this is safe.
|
||||
let r_path_clean = r_path.trim_end_matches('/');
|
||||
@@ -40,8 +229,11 @@ pub async fn download(
|
||||
None => ("/".to_string(), r_path_clean.to_string()),
|
||||
};
|
||||
|
||||
let parent_folder = find_folder(&conn, lab.id, &parent_path, password).await?;
|
||||
let files = conn.list_all_files(&parent_folder.id).await?;
|
||||
let parent_folder =
|
||||
find_folder_limited(&conn, &limiter, lab.id, &parent_path, password).await?;
|
||||
let files = conn
|
||||
.list_all_files_limited(&parent_folder.id, &limiter)
|
||||
.await?;
|
||||
|
||||
// Case 1: basename matches a file in the parent folder.
|
||||
if let Some(file) = find_file_by_name(&files, &basename) {
|
||||
@@ -61,7 +253,8 @@ pub async fn download(
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &file.download_url);
|
||||
conn.download_file(&url, &dest.to_string_lossy()).await?;
|
||||
conn.download_file_limited(&url, &dest.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}", dest.display());
|
||||
return Ok(());
|
||||
}
|
||||
@@ -76,73 +269,41 @@ pub async fn download(
|
||||
// We create that subdirectory first, then recurse into it.
|
||||
let top_local = local_real.join(&sub.name);
|
||||
|
||||
// Iterative DFS: each entry is (remote_folder_id, local_dir)
|
||||
let mut stack: Vec<(String, PathBuf)> = vec![(sub.id.clone(), top_local)];
|
||||
let mut folder_tasks: JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut download_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
let excludes = Arc::new(excludes);
|
||||
let lab_name = Arc::new(lab.name.clone());
|
||||
let password = password.map(str::to_string);
|
||||
|
||||
while let Some((folder_id, local_dir)) = stack.pop() {
|
||||
let folder = conn.retrieve_folder(&folder_id).await?;
|
||||
spawn_download_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
sub.id.clone(),
|
||||
top_local,
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
if is_excluded(&excludes, &lab.name, &folder.path, None) {
|
||||
continue;
|
||||
}
|
||||
drive_download_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut download_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
password,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
tokio::fs::create_dir_all(&local_dir).await?;
|
||||
println!("{}", local_dir.display());
|
||||
|
||||
let dir_files = conn.list_all_files(&folder_id).await?;
|
||||
|
||||
// Download files in this folder (up to 10 concurrent).
|
||||
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
|
||||
for f in &dir_files {
|
||||
if is_excluded(&excludes, &lab.name, &folder.path, Some(&f.name)) {
|
||||
continue;
|
||||
}
|
||||
let dest_path = local_dir.join(&f.name);
|
||||
if skip_if_exists {
|
||||
if dest_path.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest_path) {
|
||||
if meta.len() == f.size {
|
||||
println!("{}", dest_path.display());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let url = make_absolute_url(&conn, &f.download_url);
|
||||
let conn = conn.clone();
|
||||
futs.push(tokio::spawn(async move {
|
||||
let dest_str = dest_path.to_string_lossy().to_string();
|
||||
match conn.download_file(&url, &dest_str).await {
|
||||
Ok(_) => println!("{}", dest_path.display()),
|
||||
Err(_) => {
|
||||
eprintln!("Failed: {}", dest_path.display());
|
||||
if dest_path.is_file() {
|
||||
let _ = std::fs::remove_file(&dest_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
if futs.len() >= crate::settings::SETTINGS.concurrent {
|
||||
let _ = futs.next().await;
|
||||
}
|
||||
}
|
||||
while futs.next().await.is_some() {}
|
||||
|
||||
// Push sub-folders onto the stack for recursive processing.
|
||||
for sf in &folder.sub_folders {
|
||||
if sf.lock {
|
||||
match password {
|
||||
Some(pw) => {
|
||||
if conn.folder_auth(&sf.id, pw).await.is_err() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
let sub_local = local_dir.join(&sf.name);
|
||||
stack.push((sf.id.clone(), sub_local));
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
@@ -181,3 +342,268 @@ fn make_absolute_url(conn: &MDRSConnection, url: &str) -> String {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct DownloadFolderTaskResult {
|
||||
child_folders: Vec<(String, PathBuf)>,
|
||||
download_jobs: Vec<DownloadJob>,
|
||||
}
|
||||
|
||||
struct DownloadJob {
|
||||
url: String,
|
||||
dest_path: PathBuf,
|
||||
}
|
||||
|
||||
fn spawn_download_folder_task(
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
folder_id: String,
|
||||
local_dir: PathBuf,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
folder_tasks.spawn(async move {
|
||||
process_download_folder(
|
||||
conn,
|
||||
limiter,
|
||||
lab_name,
|
||||
excludes,
|
||||
folder_id,
|
||||
local_dir,
|
||||
password,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_download_task(
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
job: DownloadJob,
|
||||
) {
|
||||
download_tasks.spawn(async move {
|
||||
let dest_str = job.dest_path.to_string_lossy().to_string();
|
||||
match conn
|
||||
.download_file_limited(&job.url, &dest_str, &limiter)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
println!("{}", job.dest_path.display());
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
if job.dest_path.is_file() {
|
||||
let _ = std::fs::remove_file(&job.dest_path);
|
||||
}
|
||||
Err(anyhow!(
|
||||
"Failed to download {}: {}",
|
||||
job.dest_path.display(),
|
||||
err
|
||||
))
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_download_folder(
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
folder_id: String,
|
||||
local_dir: PathBuf,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) -> Result<DownloadFolderTaskResult, anyhow::Error> {
|
||||
let folder = conn.retrieve_folder_limited(&folder_id, &limiter).await?;
|
||||
|
||||
if is_excluded(excludes.as_slice(), lab_name.as_str(), &folder.path, None) {
|
||||
return Ok(DownloadFolderTaskResult {
|
||||
child_folders: Vec::new(),
|
||||
download_jobs: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all(&local_dir).await?;
|
||||
println!("{}", local_dir.display());
|
||||
|
||||
let dir_files = conn.list_all_files_limited(&folder_id, &limiter).await?;
|
||||
let mut download_jobs = Vec::new();
|
||||
for file in &dir_files {
|
||||
if is_excluded(
|
||||
excludes.as_slice(),
|
||||
lab_name.as_str(),
|
||||
&folder.path,
|
||||
Some(&file.name),
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
let dest_path = local_dir.join(&file.name);
|
||||
if skip_if_exists && dest_path.exists() {
|
||||
if let Ok(meta) = std::fs::metadata(&dest_path) {
|
||||
if meta.len() == file.size {
|
||||
println!("{}", dest_path.display());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
download_jobs.push(DownloadJob {
|
||||
url: make_absolute_url(&conn, &file.download_url),
|
||||
dest_path,
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folder_tasks: JoinSet<Result<Option<(String, PathBuf)>, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
for sub_folder in folder.sub_folders {
|
||||
let conn = conn.clone();
|
||||
let limiter = limiter.clone();
|
||||
let password = password.clone();
|
||||
let sub_local = local_dir.join(&sub_folder.name);
|
||||
child_folder_tasks.spawn(async move {
|
||||
if sub_folder.lock {
|
||||
match password.as_deref() {
|
||||
Some(pw) => {
|
||||
if conn
|
||||
.folder_auth_limited(&sub_folder.id, pw, &limiter)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
None => return Ok(None),
|
||||
}
|
||||
}
|
||||
Ok(Some((sub_folder.id, sub_local)))
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folders = Vec::new();
|
||||
while let Some(result) = child_folder_tasks.join_next().await {
|
||||
if let Some(child) = flatten_join_result(result)? {
|
||||
child_folders.push(child);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DownloadFolderTaskResult {
|
||||
child_folders,
|
||||
download_jobs,
|
||||
})
|
||||
}
|
||||
|
||||
async fn drive_download_tasks(
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
loop {
|
||||
match (folder_tasks.is_empty(), download_tasks.is_empty()) {
|
||||
(true, true) => break,
|
||||
(false, true) => {
|
||||
if let Some(result) = folder_tasks.join_next().await {
|
||||
handle_download_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
download_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
(true, false) => {
|
||||
if let Some(result) = download_tasks.join_next().await {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
(false, false) => {
|
||||
tokio::select! {
|
||||
result = folder_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
handle_download_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
download_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
result = download_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_download_folder_result(
|
||||
result: Result<Result<DownloadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
|
||||
folder_tasks: &mut JoinSet<Result<DownloadFolderTaskResult, anyhow::Error>>,
|
||||
download_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
lab_name: Arc<String>,
|
||||
excludes: Arc<Vec<String>>,
|
||||
password: Option<String>,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
match flatten_join_result(result) {
|
||||
Ok(task_result) => {
|
||||
for (folder_id, local_dir) in task_result.child_folders {
|
||||
spawn_download_folder_task(
|
||||
folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
lab_name.clone(),
|
||||
excludes.clone(),
|
||||
folder_id,
|
||||
local_dir,
|
||||
password.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
for job in task_result.download_jobs {
|
||||
spawn_download_task(download_tasks, conn.clone(), limiter.clone(), job);
|
||||
}
|
||||
}
|
||||
Err(err) => errors.push(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn flatten_join_result<T>(
|
||||
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
|
||||
) -> Result<T, anyhow::Error> {
|
||||
match result {
|
||||
Ok(inner) => inner,
|
||||
Err(err) => Err(anyhow!("Task join failed: {}", err)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,27 +1,16 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{
|
||||
find_file_by_name, find_folder, find_lab_in_cache, parse_remote_path,
|
||||
};
|
||||
use crate::cache::create_readonly_conn;
|
||||
use crate::commands::shared::{find_file_by_name, resolve_remote_file};
|
||||
use anyhow::anyhow;
|
||||
|
||||
pub async fn file_metadata(remote_path: &str, password: Option<&str>) -> Result<(), anyhow::Error> {
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let remote = remote_path
|
||||
.splitn(2, ':')
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("Invalid remote path"))?;
|
||||
let (conn, cache) = create_readonly_conn(remote).await?;
|
||||
let (parent_folder, basename) =
|
||||
resolve_remote_file(&conn, cache.as_ref(), remote_path, password).await?;
|
||||
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = create_authenticated_conn(&remote, &cache)?;
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
let lab_id = lab.id;
|
||||
|
||||
// Split the file path into parent directory and filename
|
||||
let path = r_path.trim_end_matches('/');
|
||||
let (dirname, basename) = if let Some(pos) = path.rfind('/') {
|
||||
let d = if pos == 0 { "/" } else { &path[..pos] };
|
||||
(d.to_string(), path[pos + 1..].to_string())
|
||||
} else {
|
||||
("/".to_string(), path.to_string())
|
||||
};
|
||||
|
||||
let parent_folder = find_folder(&conn, lab_id, &dirname, password).await?;
|
||||
let files = conn.list_all_files(&parent_folder.id).await?;
|
||||
|
||||
let file = find_file_by_name(&files, &basename)
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::cache::create_readonly_conn;
|
||||
|
||||
pub async fn labs(remote: &str) -> Result<(), anyhow::Error> {
|
||||
let cache = load_cache_with_token_refresh(remote).await?;
|
||||
let conn = create_authenticated_conn(remote, &cache)?;
|
||||
let (conn, _) = create_readonly_conn(remote).await?;
|
||||
let labs = conn.list_laboratories().await?;
|
||||
|
||||
let header = ("Name", "PI", "Laboratory");
|
||||
|
||||
+17
-64
@@ -1,14 +1,10 @@
|
||||
use crate::cache::{CacheLabsWrapper, CacheUser, compute_digest};
|
||||
use crate::cache::{Cache, CacheLabsWrapper, CacheToken, CacheUser, compute_digest, persist_cache};
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::laboratory::Laboratories;
|
||||
use crate::models::user::User;
|
||||
use anyhow::{anyhow, bail};
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use serde_json::{Value, json};
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
/// Prompt for credentials if not supplied and then perform login.
|
||||
/// This is the entry point called from `main`.
|
||||
@@ -72,66 +68,23 @@ pub async fn login(username: &str, password: &str, remote: &str) -> Result<(), a
|
||||
let cache_labs: CacheLabsWrapper = (&labs).into();
|
||||
|
||||
// compute Python-compatible digest
|
||||
let digest = compute_digest(
|
||||
cache_user_opt.as_ref(),
|
||||
&token.access,
|
||||
&token.refresh,
|
||||
&cache_labs,
|
||||
);
|
||||
|
||||
// build the cache JSON — field order matches Python's dataclass layout:
|
||||
// user (id, username, laboratory_ids, is_reviewer)
|
||||
// token (access, refresh)
|
||||
// laboratories (items)
|
||||
// digest
|
||||
let user_val: Value = match &cache_user_opt {
|
||||
Some(u) => json!({
|
||||
"id": u.id,
|
||||
"username": u.username,
|
||||
"laboratory_ids": u.laboratory_ids,
|
||||
"is_reviewer": u.is_reviewer
|
||||
}),
|
||||
None => Value::Null,
|
||||
let cache = Cache {
|
||||
user: cache_user_opt,
|
||||
token: CacheToken {
|
||||
access: token.access,
|
||||
refresh: token.refresh,
|
||||
},
|
||||
laboratories: cache_labs,
|
||||
digest: String::new(),
|
||||
};
|
||||
let labs_items: Vec<Value> = cache_labs
|
||||
.items
|
||||
.iter()
|
||||
.map(|l| {
|
||||
json!({
|
||||
"id": l.id,
|
||||
"name": l.name,
|
||||
"pi_name": l.pi_name,
|
||||
"full_name": l.full_name
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let obj = json!({
|
||||
"user": user_val,
|
||||
"token": {"access": token.access, "refresh": token.refresh},
|
||||
"laboratories": {"items": labs_items},
|
||||
"digest": digest
|
||||
});
|
||||
|
||||
// write cache file: {config_dirname}/cache/<remote>.json
|
||||
let cache_dir = crate::settings::SETTINGS.config_dirname.join("cache");
|
||||
fs::create_dir_all(&cache_dir)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&cache_dir)?.permissions();
|
||||
perms.set_mode(0o700);
|
||||
fs::set_permissions(&cache_dir, perms)?;
|
||||
}
|
||||
let cache_file = cache_dir.join(format!("{}.json", remote));
|
||||
let tmp = cache_file.with_extension("tmp");
|
||||
|
||||
fs::write(&tmp, serde_json::to_vec_pretty(&obj)?)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut perms = fs::metadata(&tmp)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(&tmp, perms)?;
|
||||
}
|
||||
fs::rename(&tmp, &cache_file)?;
|
||||
let mut cache = cache;
|
||||
cache.digest = compute_digest(
|
||||
cache.user.as_ref(),
|
||||
&cache.token.access,
|
||||
&cache.token.refresh,
|
||||
&cache.laboratories,
|
||||
);
|
||||
persist_cache(remote, &cache)?;
|
||||
|
||||
println!("Login Successful");
|
||||
Ok(())
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
pub fn logout(remote: &str) -> Result<(), anyhow::Error> {
|
||||
let cache_path = crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote));
|
||||
if cache_path.exists() {
|
||||
std::fs::remove_file(&cache_path)?;
|
||||
}
|
||||
Ok(())
|
||||
crate::cache::remove_cache(remote)
|
||||
}
|
||||
|
||||
+11
-9
@@ -1,5 +1,5 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{find_folder, find_lab_in_cache, fmt_datetime, parse_remote_path};
|
||||
use crate::cache::create_readonly_conn;
|
||||
use crate::commands::shared::{fmt_datetime, resolve_remote_folder};
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::file::File;
|
||||
use crate::models::folder::{FolderDetail, FolderSimple};
|
||||
@@ -14,12 +14,14 @@ pub async fn ls(
|
||||
is_recursive: bool,
|
||||
is_quiet: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let (remote, labname, path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = create_authenticated_conn(&remote, &cache)?;
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
|
||||
let folder = find_folder(&conn, lab.id, &path, password).await?;
|
||||
let remote = remote_path
|
||||
.splitn(2, ':')
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("Invalid remote path"))?;
|
||||
let (conn, cache) = create_readonly_conn(remote).await?;
|
||||
let (folder, lab) =
|
||||
resolve_remote_folder(&conn, cache.as_ref(), None, remote_path, password).await?;
|
||||
let labname = lab.name;
|
||||
|
||||
if is_json {
|
||||
let output = if is_recursive {
|
||||
@@ -29,7 +31,7 @@ pub async fn ls(
|
||||
};
|
||||
println!("{}", serde_json::to_string(&output)?);
|
||||
} else if is_recursive {
|
||||
let prefix = format!("{}:/{}", remote, labname);
|
||||
let prefix = format!("{}:/{}", conn.remote.as_deref().unwrap_or(""), labname);
|
||||
ls_plain_recursive(&conn, folder, &labname, &prefix, password).await?;
|
||||
} else {
|
||||
let files = conn.list_all_files(&folder.id).await?;
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{find_folder, find_lab_in_cache, parse_remote_path};
|
||||
use crate::cache::create_readonly_conn;
|
||||
use crate::commands::shared::resolve_remote_folder;
|
||||
|
||||
pub async fn metadata(remote_path: &str, password: Option<&str>) -> Result<(), anyhow::Error> {
|
||||
let (remote, labname, folder_path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = create_authenticated_conn(&remote, &cache)?;
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
let folder = find_folder(&conn, lab.id, &folder_path, password).await?;
|
||||
let remote = remote_path
|
||||
.splitn(2, ':')
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("Invalid remote path"))?;
|
||||
let (conn, cache) = create_readonly_conn(remote).await?;
|
||||
let (folder, _) =
|
||||
resolve_remote_folder(&conn, cache.as_ref(), None, remote_path, password).await?;
|
||||
|
||||
let resp = conn
|
||||
.get(&format!("v3/folders/{}/metadata/", folder.id))
|
||||
|
||||
@@ -1,10 +1,158 @@
|
||||
use crate::cache::{Cache, CacheLaboratory};
|
||||
use crate::connection::ApiRequestLimiter;
|
||||
use crate::connection::MDRSConnection;
|
||||
use crate::models::file::File;
|
||||
use crate::models::folder::{FolderDetail, FolderSimple};
|
||||
use crate::models::laboratory::Laboratory;
|
||||
use anyhow::{anyhow, bail};
|
||||
use unicode_normalization::UnicodeNormalization;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// DOI helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Return true if the path component (after the remote: prefix) looks like a
|
||||
/// DOI string, i.e. starts with "10." and contains a "/".
|
||||
pub fn is_doi(path: &str) -> bool {
|
||||
path.starts_with("10.") && path.contains('/')
|
||||
}
|
||||
|
||||
/// Extract the DOI suffix ID from a full DOI string.
|
||||
///
|
||||
/// MDRS uses the segment after the last `.` in the DOI as its internal
|
||||
/// identifier, e.g. `10.xxxx/prefix.20230511-001` → `20230511-001`.
|
||||
/// If there is no `.` after the `/`, the entire suffix after `/` is used.
|
||||
pub fn doi_suffix_id(doi: &str) -> &str {
|
||||
// Find the slash that separates DOI prefix from suffix.
|
||||
if let Some(slash_pos) = doi.find('/') {
|
||||
let suffix = &doi[slash_pos + 1..];
|
||||
// Use the part after the last `.` within the suffix.
|
||||
if let Some(dot_pos) = suffix.rfind('.') {
|
||||
&suffix[dot_pos + 1..]
|
||||
} else {
|
||||
suffix
|
||||
}
|
||||
} else {
|
||||
doi
|
||||
}
|
||||
}
|
||||
|
||||
/// Split a DOI-with-optional-path string into `(doi, subpath)`.
|
||||
///
|
||||
/// A DOI has the form `10.REGISTRANT/SUFFIX` where SUFFIX contains no `/`.
|
||||
/// Anything after the first `/` following SUFFIX is treated as a subfolder path.
|
||||
///
|
||||
/// Examples:
|
||||
/// - `"10.1234/prefix.ID"` → `("10.1234/prefix.ID", "")`
|
||||
/// - `"10.1234/prefix.ID/"` → `("10.1234/prefix.ID", "")`
|
||||
/// - `"10.1234/prefix.ID/sub"` → `("10.1234/prefix.ID", "/sub")`
|
||||
/// - `"10.1234/prefix.ID/sub/deep"` → `("10.1234/prefix.ID", "/sub/deep")`
|
||||
pub fn split_doi_and_subpath(doi_with_path: &str) -> (&str, &str) {
|
||||
// Find the first `/` that separates registrant from suffix.
|
||||
if let Some(first_slash) = doi_with_path.find('/') {
|
||||
let after_suffix_start = first_slash + 1;
|
||||
let after_first = &doi_with_path[after_suffix_start..];
|
||||
// Find the next `/` inside the suffix portion — this starts the subpath.
|
||||
if let Some(second_slash) = after_first.find('/') {
|
||||
let doi_end = after_suffix_start + second_slash;
|
||||
let doi = &doi_with_path[..doi_end];
|
||||
let subpath = &doi_with_path[doi_end..]; // begins with "/"
|
||||
// Treat a bare trailing slash as no subpath (root of DOI folder).
|
||||
if subpath == "/" {
|
||||
(doi, "")
|
||||
} else {
|
||||
(doi, subpath)
|
||||
}
|
||||
} else {
|
||||
// No second slash — the whole string is the DOI, no subpath.
|
||||
(doi_with_path, "")
|
||||
}
|
||||
} else {
|
||||
(doi_with_path, "")
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse `"remote:10.xxxx/prefix.ID[/optional/sub/path]"` into
|
||||
/// `(remote, doi, subpath)`.
|
||||
///
|
||||
/// `subpath` is empty when the remote path points directly at the DOI root
|
||||
/// folder. Otherwise it is an absolute path string like `"/subfolder/deep"`.
|
||||
pub fn parse_doi_remote_path(remote_path: &str) -> Result<(String, String, String), anyhow::Error> {
|
||||
let parts: Vec<&str> = remote_path.splitn(2, ':').collect();
|
||||
if parts.len() != 2 {
|
||||
bail!("remote_path must be in the form 'remote:10.xxxx/prefix.ID'");
|
||||
}
|
||||
let remote = parts[0].to_string();
|
||||
let doi_with_path = parts[1];
|
||||
if !is_doi(doi_with_path) {
|
||||
bail!(
|
||||
"Path `{}` does not look like a DOI (must start with '10.' and contain '/').",
|
||||
doi_with_path
|
||||
);
|
||||
}
|
||||
let (doi, subpath) = split_doi_and_subpath(doi_with_path);
|
||||
Ok((remote, doi.to_string(), subpath.to_string()))
|
||||
}
|
||||
|
||||
/// Resolve a DOI string to a (FolderDetail, Laboratory) pair.
|
||||
///
|
||||
/// Calls GET v3/doi/{id}/ to look up the folder ID, then fetches the full
|
||||
/// folder detail (which carries `laboratory_id`) and resolves the laboratory.
|
||||
pub async fn find_folder_by_doi(
|
||||
conn: &MDRSConnection,
|
||||
doi: &str,
|
||||
password: Option<&str>,
|
||||
) -> Result<(FolderDetail, Laboratory), anyhow::Error> {
|
||||
// Strip any trailing slash from the DOI before extracting the suffix ID.
|
||||
let doi_clean = doi.trim_end_matches('/');
|
||||
let id = doi_suffix_id(doi_clean);
|
||||
let doi_resp = conn.retrieve_doi(id).await?;
|
||||
|
||||
// Verify that the returned DOI matches the one supplied by the caller
|
||||
// (case-insensitive, trimming trailing slashes).
|
||||
let returned = doi_resp.doi.trim_end_matches('/');
|
||||
if !returned.eq_ignore_ascii_case(doi_clean) {
|
||||
bail!(
|
||||
"DOI mismatch: requested `{}` but server returned `{}`.",
|
||||
doi_clean,
|
||||
returned
|
||||
);
|
||||
}
|
||||
|
||||
let folder_id = &doi_resp.folder.id;
|
||||
|
||||
// Fetch the full folder detail; laboratory_id is available here.
|
||||
let folder = conn
|
||||
.retrieve_folder(folder_id)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to retrieve folder for DOI `{}`: {}", doi_clean, e))?;
|
||||
|
||||
// Handle password-locked folder.
|
||||
if folder.lock {
|
||||
match password {
|
||||
None => {
|
||||
bail!(
|
||||
"Folder for DOI `{}` is locked. Use -p/--password to provide a password.",
|
||||
doi_clean
|
||||
);
|
||||
}
|
||||
Some(pw) => conn.folder_auth(folder_id, pw).await?,
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve laboratory using the laboratory_id from the folder detail.
|
||||
let lab_id = folder.laboratory_id;
|
||||
let lab = conn
|
||||
.list_laboratories()
|
||||
.await?
|
||||
.items
|
||||
.into_iter()
|
||||
.find(|l| l.id == lab_id)
|
||||
.ok_or_else(|| anyhow!("Laboratory with id {} not found.", lab_id))?;
|
||||
|
||||
Ok((folder, lab))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Path helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -47,6 +195,32 @@ pub fn find_lab_in_cache<'a>(
|
||||
.ok_or_else(|| anyhow!("Laboratory `{}` not found.", labname))
|
||||
}
|
||||
|
||||
/// Resolve a laboratory by name using cached laboratories when available, and
|
||||
/// falling back to the API when the user is anonymous.
|
||||
pub async fn find_laboratory(
|
||||
conn: &MDRSConnection,
|
||||
cache: Option<&Cache>,
|
||||
labname: &str,
|
||||
) -> Result<Laboratory, anyhow::Error> {
|
||||
if let Some(cache) = cache {
|
||||
if let Ok(lab) = find_lab_in_cache(cache, labname) {
|
||||
return Ok(Laboratory {
|
||||
id: lab.id,
|
||||
name: lab.name.clone(),
|
||||
pi_name: lab.pi_name.clone(),
|
||||
full_name: lab.full_name.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
conn.list_laboratories()
|
||||
.await?
|
||||
.items
|
||||
.into_iter()
|
||||
.find(|lab| lab.name == labname)
|
||||
.ok_or_else(|| anyhow!("Laboratory `{}` not found.", labname))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unicode helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -95,6 +269,49 @@ pub async fn find_folder(
|
||||
Ok(folder)
|
||||
}
|
||||
|
||||
/// Resolve a folder by path while consuming the shared API concurrency budget.
|
||||
pub async fn find_folder_limited(
|
||||
conn: &MDRSConnection,
|
||||
limiter: &ApiRequestLimiter,
|
||||
lab_id: u32,
|
||||
path: &str,
|
||||
password: Option<&str>,
|
||||
) -> Result<FolderDetail, anyhow::Error> {
|
||||
let normalized_path = nfc(path);
|
||||
let folders = conn
|
||||
.list_folders_by_path_limited(lab_id, &normalized_path, limiter)
|
||||
.await?;
|
||||
if folders.is_empty() {
|
||||
bail!("Folder `{}` not found.", path);
|
||||
}
|
||||
if folders.len() != 1 {
|
||||
bail!(
|
||||
"Ambiguous path `{}`: {} folders matched.",
|
||||
path,
|
||||
folders.len()
|
||||
);
|
||||
}
|
||||
let folder_simple = &folders[0];
|
||||
if folder_simple.lock {
|
||||
match password {
|
||||
None => {
|
||||
bail!(
|
||||
"Folder `{}` is locked. Use -p/--password to provide a password.",
|
||||
path
|
||||
);
|
||||
}
|
||||
Some(pw) => {
|
||||
conn.folder_auth_limited(&folder_simple.id, pw, limiter)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
let folder = conn
|
||||
.retrieve_folder_limited(&folder_simple.id, limiter)
|
||||
.await?;
|
||||
Ok(folder)
|
||||
}
|
||||
|
||||
/// Find a file by name (NFC-normalized, case-insensitive) in a file list.
|
||||
pub fn find_file_by_name<'a>(files: &'a [File], name: &str) -> Option<&'a File> {
|
||||
let name_lower = nfc(name).to_lowercase();
|
||||
@@ -134,3 +351,225 @@ pub fn fmt_datetime(iso: &str) -> String {
|
||||
iso.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve any remote path (normal or DOI-based) into a FolderDetail and Laboratory.
|
||||
/// Takes an optional API limiter for download command compatibility.
|
||||
pub async fn resolve_remote_folder(
|
||||
conn: &MDRSConnection,
|
||||
cache: Option<&Cache>,
|
||||
limiter: Option<&ApiRequestLimiter>,
|
||||
remote_path: &str,
|
||||
password: Option<&str>,
|
||||
) -> Result<(FolderDetail, Laboratory), anyhow::Error> {
|
||||
let path_component = remote_path.splitn(2, ':').nth(1).unwrap_or("");
|
||||
if is_doi(path_component) {
|
||||
let (_, doi, subpath) = parse_doi_remote_path(remote_path)?;
|
||||
let (doi_folder, lab) = find_folder_by_doi(conn, &doi, password).await?;
|
||||
if subpath.is_empty() {
|
||||
Ok((doi_folder, lab))
|
||||
} else {
|
||||
let abs_path = format!("{}{}", doi_folder.path.trim_end_matches('/'), subpath);
|
||||
let folder = if let Some(l) = limiter {
|
||||
find_folder_limited(conn, l, lab.id, &abs_path, password).await?
|
||||
} else {
|
||||
find_folder(conn, lab.id, &abs_path, password).await?
|
||||
};
|
||||
Ok((folder, lab))
|
||||
}
|
||||
} else {
|
||||
let (_, labname, folder_path) = parse_remote_path(remote_path)?;
|
||||
let lab = find_laboratory(conn, cache, &labname).await?;
|
||||
let folder = if let Some(l) = limiter {
|
||||
find_folder_limited(conn, l, lab.id, &folder_path, password).await?
|
||||
} else {
|
||||
find_folder(conn, lab.id, &folder_path, password).await?
|
||||
};
|
||||
Ok((folder, lab))
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves a remote path pointing to a file into the parent FolderDetail and the file's basename.
|
||||
pub async fn resolve_remote_file(
|
||||
conn: &MDRSConnection,
|
||||
cache: Option<&Cache>,
|
||||
remote_path: &str,
|
||||
password: Option<&str>,
|
||||
) -> Result<(FolderDetail, String), anyhow::Error> {
|
||||
let path_component = remote_path.splitn(2, ':').nth(1).unwrap_or("");
|
||||
if is_doi(path_component) {
|
||||
let (_, doi, subpath) = parse_doi_remote_path(remote_path)?;
|
||||
let (doi_folder, lab) = find_folder_by_doi(conn, &doi, password).await?;
|
||||
let subpath_clean = subpath.trim_end_matches('/');
|
||||
if subpath_clean.is_empty() {
|
||||
bail!("DOI path must point to a file, not a folder.");
|
||||
}
|
||||
let (sub_dir, basename) = if let Some(pos) = subpath_clean.rfind('/') {
|
||||
let d = if pos == 0 { "/" } else { &subpath_clean[..pos] };
|
||||
(d.to_string(), subpath_clean[pos + 1..].to_string())
|
||||
} else {
|
||||
("/".to_string(), subpath_clean.to_string())
|
||||
};
|
||||
let abs_path = format!(
|
||||
"{}{}",
|
||||
doi_folder.path.trim_end_matches('/'),
|
||||
if sub_dir.starts_with('/') {
|
||||
sub_dir
|
||||
} else {
|
||||
format!("/{}", sub_dir)
|
||||
}
|
||||
);
|
||||
let parent = find_folder(conn, lab.id, &abs_path, password).await?;
|
||||
Ok((parent, basename))
|
||||
} else {
|
||||
let (_, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let lab = find_laboratory(conn, cache, &labname).await?;
|
||||
let path = r_path.trim_end_matches('/');
|
||||
let (dirname, basename) = if let Some(pos) = path.rfind('/') {
|
||||
let d = if pos == 0 { "/" } else { &path[..pos] };
|
||||
(d.to_string(), path[pos + 1..].to_string())
|
||||
} else {
|
||||
("/".to_string(), path.to_string())
|
||||
};
|
||||
let parent = find_folder(conn, lab.id, &dirname, password).await?;
|
||||
Ok((parent, basename))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_laboratory_falls_back_to_api_without_authorization() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 4096];
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
let req = String::from_utf8_lossy(&buf[..n]);
|
||||
|
||||
assert!(req.starts_with("GET /v3/laboratories/ HTTP/1.1"));
|
||||
assert!(!req.contains("\r\nAuthorization: Bearer "));
|
||||
|
||||
let body =
|
||||
r#"[{"id":1,"name":"public-lab","pi_name":"PI","full_name":"Public Laboratory"}]"#;
|
||||
let response = format!(
|
||||
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
|
||||
body.len(),
|
||||
body
|
||||
);
|
||||
stream.write_all(response.as_bytes()).await.unwrap();
|
||||
});
|
||||
|
||||
let conn = MDRSConnection::new(&format!("http://{addr}"));
|
||||
let lab = find_laboratory(&conn, None, "public-lab").await.unwrap();
|
||||
|
||||
assert_eq!(lab.id, 1);
|
||||
assert_eq!(lab.name, "public-lab");
|
||||
server.await.unwrap();
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// DOI helper unit tests
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn is_doi_returns_true_for_valid_doi() {
|
||||
assert!(is_doi("10.12345/prefix.20230511-001"));
|
||||
assert!(is_doi("10.1234/abc"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_doi_returns_false_for_normal_paths() {
|
||||
assert!(!is_doi("/labname/path/to/folder"));
|
||||
assert!(!is_doi("labname/path"));
|
||||
assert!(!is_doi("10.1234")); // no slash
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doi_suffix_id_extracts_last_dot_segment() {
|
||||
assert_eq!(
|
||||
doi_suffix_id("10.12345/prefix.20230511-001"),
|
||||
"20230511-001"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doi_suffix_id_no_dot_in_suffix_returns_whole_suffix() {
|
||||
assert_eq!(doi_suffix_id("10.1234/nodot"), "nodot");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doi_suffix_id_no_slash_returns_whole_input() {
|
||||
assert_eq!(doi_suffix_id("10.1234"), "10.1234");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_doi_remote_path_valid_no_subpath() {
|
||||
let (remote, doi, subpath) =
|
||||
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001").unwrap();
|
||||
assert_eq!(remote, "neurodata");
|
||||
assert_eq!(doi, "10.12345/prefix.20230511-001");
|
||||
assert_eq!(subpath, "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_doi_remote_path_valid_trailing_slash() {
|
||||
let (remote, doi, subpath) =
|
||||
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001/").unwrap();
|
||||
assert_eq!(remote, "neurodata");
|
||||
assert_eq!(doi, "10.12345/prefix.20230511-001");
|
||||
assert_eq!(subpath, ""); // trailing slash treated as no subpath
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_doi_remote_path_valid_with_subpath() {
|
||||
let (remote, doi, subpath) =
|
||||
parse_doi_remote_path("neurodata:10.12345/prefix.20230511-001/sub/folder").unwrap();
|
||||
assert_eq!(remote, "neurodata");
|
||||
assert_eq!(doi, "10.12345/prefix.20230511-001");
|
||||
assert_eq!(subpath, "/sub/folder");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_doi_remote_path_rejects_normal_path() {
|
||||
let err = parse_doi_remote_path("neurodata:/lab/path").unwrap_err();
|
||||
assert!(err.to_string().contains("does not look like a DOI"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_doi_and_subpath_no_subpath() {
|
||||
assert_eq!(
|
||||
split_doi_and_subpath("10.1234/prefix.ID"),
|
||||
("10.1234/prefix.ID", "")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_doi_and_subpath_trailing_slash_only() {
|
||||
assert_eq!(
|
||||
split_doi_and_subpath("10.1234/prefix.ID/"),
|
||||
("10.1234/prefix.ID", "")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_doi_and_subpath_single_level() {
|
||||
assert_eq!(
|
||||
split_doi_and_subpath("10.1234/prefix.ID/sub"),
|
||||
("10.1234/prefix.ID", "/sub")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_doi_and_subpath_multi_level() {
|
||||
assert_eq!(
|
||||
split_doi_and_subpath("10.1234/prefix.ID/sub/deep/path"),
|
||||
("10.1234/prefix.ID", "/sub/deep/path")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+261
-75
@@ -1,13 +1,14 @@
|
||||
use crate::cache::{create_authenticated_conn, load_cache_with_token_refresh};
|
||||
use crate::commands::shared::{
|
||||
find_file_by_name, find_folder, find_lab_in_cache, nfc, parse_remote_path,
|
||||
find_file_by_name, find_folder_limited, find_lab_in_cache, nfc, parse_remote_path,
|
||||
};
|
||||
use crate::connection::{ApiRequestLimiter, MDRSConnection};
|
||||
use crate::models::folder::FolderSimple;
|
||||
use anyhow::{anyhow, bail};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub async fn upload(
|
||||
local_path: &str,
|
||||
@@ -18,8 +19,9 @@ pub async fn upload(
|
||||
let (remote, labname, r_path) = parse_remote_path(remote_path)?;
|
||||
let cache = load_cache_with_token_refresh(&remote).await?;
|
||||
let conn = Arc::new(create_authenticated_conn(&remote, &cache)?);
|
||||
let limiter = ApiRequestLimiter::new(crate::settings::SETTINGS.concurrent);
|
||||
let lab = find_lab_in_cache(&cache, &labname)?;
|
||||
let dest_folder = find_folder(&conn, lab.id, &r_path, None).await?;
|
||||
let dest_folder = find_folder_limited(&conn, &limiter, lab.id, &r_path, None).await?;
|
||||
|
||||
// Normalize local_path: resolve to an absolute canonical path so that
|
||||
// trailing slashes and "./" prefixes are handled consistently (matching
|
||||
@@ -30,7 +32,9 @@ pub async fn upload(
|
||||
|
||||
if local.is_file() {
|
||||
let filename = local.file_name().unwrap().to_string_lossy().to_string();
|
||||
let remote_files = conn.list_all_files(&dest_folder.id).await?;
|
||||
let remote_files = conn
|
||||
.list_all_files_limited(&dest_folder.id, &limiter)
|
||||
.await?;
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if rf.size == std::fs::metadata(local)?.len() {
|
||||
@@ -39,7 +43,7 @@ pub async fn upload(
|
||||
}
|
||||
}
|
||||
}
|
||||
conn.upload_file(&dest_folder.id, &local.to_string_lossy())
|
||||
conn.upload_file_limited(&dest_folder.id, &local.to_string_lossy(), &limiter)
|
||||
.await?;
|
||||
println!("{}{}", dest_folder.path, filename);
|
||||
} else if local.is_dir() {
|
||||
@@ -52,75 +56,43 @@ pub async fn upload(
|
||||
let local_basename = local.file_name().unwrap().to_string_lossy().to_string();
|
||||
let top_remote_id = find_or_create_folder(
|
||||
&conn,
|
||||
&limiter,
|
||||
&dest_folder.id,
|
||||
&dest_folder.sub_folders,
|
||||
&local_basename,
|
||||
)
|
||||
.await?;
|
||||
let top_folder = conn.retrieve_folder(&top_remote_id).await?;
|
||||
println!("{}", top_folder.path.trim_end_matches('/'));
|
||||
println!(
|
||||
"{}",
|
||||
format!("{}{}", dest_folder.path, local_basename).trim_end_matches('/')
|
||||
);
|
||||
|
||||
// Iterative depth-first walk: each entry is (local_dir, remote_folder_id)
|
||||
let mut stack: Vec<(PathBuf, String)> = vec![(local.to_path_buf(), top_remote_id)];
|
||||
let mut folder_tasks: JoinSet<Result<UploadFolderTaskResult, anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let mut upload_tasks: JoinSet<Result<(), anyhow::Error>> = JoinSet::new();
|
||||
let mut errors = Vec::new();
|
||||
|
||||
while let Some((local_dir, remote_id)) = stack.pop() {
|
||||
let folder_detail = conn.retrieve_folder(&remote_id).await?;
|
||||
let remote_files = conn.list_all_files(&remote_id).await?;
|
||||
spawn_upload_folder_task(
|
||||
&mut folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
local.to_path_buf(),
|
||||
top_remote_id,
|
||||
skip_if_exists,
|
||||
);
|
||||
|
||||
let mut entries = fs::read_dir(&local_dir).await?;
|
||||
let mut subdirs: Vec<PathBuf> = Vec::new();
|
||||
let mut files: Vec<PathBuf> = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let p = entry.path();
|
||||
if p.is_dir() {
|
||||
subdirs.push(p);
|
||||
} else {
|
||||
files.push(p);
|
||||
}
|
||||
}
|
||||
drive_upload_tasks(
|
||||
&mut folder_tasks,
|
||||
&mut upload_tasks,
|
||||
&mut errors,
|
||||
conn.clone(),
|
||||
limiter,
|
||||
skip_if_exists,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Ensure each local sub-directory exists on the remote side
|
||||
for subdir in subdirs {
|
||||
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
|
||||
let sub_remote_id =
|
||||
find_or_create_folder(&conn, &remote_id, &folder_detail.sub_folders, &dirname)
|
||||
.await?;
|
||||
let sub_folder = conn.retrieve_folder(&sub_remote_id).await?;
|
||||
println!("{}", sub_folder.path.trim_end_matches('/'));
|
||||
stack.push((subdir, sub_remote_id));
|
||||
}
|
||||
|
||||
// Upload files in this directory (up to 10 concurrent)
|
||||
let mut futs: FuturesUnordered<tokio::task::JoinHandle<()>> = FuturesUnordered::new();
|
||||
for file_path in files {
|
||||
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
|
||||
let file_path_str = file_path.to_string_lossy().to_string();
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if let Ok(meta) = std::fs::metadata(&file_path) {
|
||||
if rf.size == meta.len() {
|
||||
let remote_path_prefix = folder_detail.path.clone();
|
||||
println!("{}{}", remote_path_prefix, filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let conn = conn.clone();
|
||||
let folder_id = remote_id.clone();
|
||||
let remote_path_prefix = folder_detail.path.clone();
|
||||
let fname = filename.clone();
|
||||
futs.push(tokio::spawn(async move {
|
||||
match conn.upload_file(&folder_id, &file_path_str).await {
|
||||
Ok(_) => println!("{}{}", remote_path_prefix, fname),
|
||||
Err(e) => eprintln!("Error: {}", e),
|
||||
}
|
||||
}));
|
||||
if futs.len() >= crate::settings::SETTINGS.concurrent {
|
||||
let _ = futs.next().await;
|
||||
}
|
||||
}
|
||||
while futs.next().await.is_some() {}
|
||||
if !errors.is_empty() {
|
||||
bail!(errors.join("\n"));
|
||||
}
|
||||
} else {
|
||||
bail!("File or directory `{}` not found.", local_path);
|
||||
@@ -131,7 +103,8 @@ pub async fn upload(
|
||||
|
||||
/// Find an existing sub-folder by name or create it, returning its ID.
|
||||
async fn find_or_create_folder(
|
||||
conn: &crate::connection::MDRSConnection,
|
||||
conn: &MDRSConnection,
|
||||
limiter: &ApiRequestLimiter,
|
||||
parent_id: &str,
|
||||
existing: &[FolderSimple],
|
||||
name: &str,
|
||||
@@ -142,13 +115,226 @@ async fn find_or_create_folder(
|
||||
{
|
||||
return Ok(sf.id.clone());
|
||||
}
|
||||
let resp = conn.create_folder(parent_id, &nfc(name)).await?;
|
||||
if !resp.status().is_success() {
|
||||
bail!("Failed to create remote folder: {}", name);
|
||||
}
|
||||
let json: serde_json::Value = resp.json().await?;
|
||||
json["id"]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow!("No id in create_folder response for {}", name))
|
||||
.map(|s| s.to_string())
|
||||
conn.create_folder_id_limited(parent_id, &nfc(name), limiter)
|
||||
.await
|
||||
}
|
||||
|
||||
struct UploadFolderTaskResult {
|
||||
child_folders: Vec<(PathBuf, String)>,
|
||||
upload_jobs: Vec<UploadJob>,
|
||||
}
|
||||
|
||||
struct UploadJob {
|
||||
folder_id: String,
|
||||
file_path: String,
|
||||
remote_path: String,
|
||||
}
|
||||
|
||||
fn spawn_upload_folder_task(
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
local_dir: PathBuf,
|
||||
remote_id: String,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
folder_tasks.spawn(async move {
|
||||
process_upload_folder(conn, limiter, local_dir, remote_id, skip_if_exists).await
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_upload_task(
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
job: UploadJob,
|
||||
) {
|
||||
upload_tasks.spawn(async move {
|
||||
conn.upload_file_limited(&job.folder_id, &job.file_path, &limiter)
|
||||
.await?;
|
||||
println!("{}", job.remote_path);
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
|
||||
async fn process_upload_folder(
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
local_dir: PathBuf,
|
||||
remote_id: String,
|
||||
skip_if_exists: bool,
|
||||
) -> Result<UploadFolderTaskResult, anyhow::Error> {
|
||||
let (folder_detail, remote_files) = tokio::try_join!(
|
||||
conn.retrieve_folder_limited(&remote_id, &limiter),
|
||||
conn.list_all_files_limited(&remote_id, &limiter),
|
||||
)?;
|
||||
|
||||
let mut entries = fs::read_dir(&local_dir).await?;
|
||||
let mut subdirs: Vec<PathBuf> = Vec::new();
|
||||
let mut files: Vec<PathBuf> = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
subdirs.push(path);
|
||||
} else {
|
||||
files.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
let mut subdir_tasks: JoinSet<Result<(PathBuf, String, String), anyhow::Error>> =
|
||||
JoinSet::new();
|
||||
let existing_subfolders = Arc::new(folder_detail.sub_folders.clone());
|
||||
let folder_path_prefix = folder_detail.path.clone();
|
||||
for subdir in subdirs {
|
||||
let conn = conn.clone();
|
||||
let limiter = limiter.clone();
|
||||
let remote_id = remote_id.clone();
|
||||
let existing_subfolders = existing_subfolders.clone();
|
||||
let folder_path_prefix = folder_path_prefix.clone();
|
||||
subdir_tasks.spawn(async move {
|
||||
let dirname = subdir.file_name().unwrap().to_string_lossy().to_string();
|
||||
let sub_remote_id = find_or_create_folder(
|
||||
&conn,
|
||||
&limiter,
|
||||
&remote_id,
|
||||
existing_subfolders.as_slice(),
|
||||
&dirname,
|
||||
)
|
||||
.await?;
|
||||
Ok((
|
||||
subdir,
|
||||
sub_remote_id,
|
||||
format!("{}{}", folder_path_prefix, dirname),
|
||||
))
|
||||
});
|
||||
}
|
||||
|
||||
let mut child_folders = Vec::new();
|
||||
while let Some(result) = subdir_tasks.join_next().await {
|
||||
let (subdir, sub_remote_id, remote_path) = flatten_join_result(result)?;
|
||||
println!("{}", remote_path.trim_end_matches('/'));
|
||||
child_folders.push((subdir, sub_remote_id));
|
||||
}
|
||||
|
||||
let mut upload_jobs = Vec::new();
|
||||
for file_path in files {
|
||||
let filename = file_path.file_name().unwrap().to_string_lossy().to_string();
|
||||
if skip_if_exists {
|
||||
if let Some(rf) = find_file_by_name(&remote_files, &filename) {
|
||||
if let Ok(meta) = std::fs::metadata(&file_path) {
|
||||
if rf.size == meta.len() {
|
||||
println!("{}{}", folder_detail.path, filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
upload_jobs.push(UploadJob {
|
||||
folder_id: remote_id.clone(),
|
||||
file_path: file_path.to_string_lossy().to_string(),
|
||||
remote_path: format!("{}{}", folder_detail.path, filename),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(UploadFolderTaskResult {
|
||||
child_folders,
|
||||
upload_jobs,
|
||||
})
|
||||
}
|
||||
|
||||
async fn drive_upload_tasks(
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
loop {
|
||||
match (folder_tasks.is_empty(), upload_tasks.is_empty()) {
|
||||
(true, true) => break,
|
||||
(false, true) => {
|
||||
if let Some(result) = folder_tasks.join_next().await {
|
||||
handle_upload_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
upload_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
(true, false) => {
|
||||
if let Some(result) = upload_tasks.join_next().await {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
(false, false) => {
|
||||
tokio::select! {
|
||||
result = folder_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
handle_upload_folder_result(
|
||||
result,
|
||||
folder_tasks,
|
||||
upload_tasks,
|
||||
errors,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
}
|
||||
result = upload_tasks.join_next() => {
|
||||
if let Some(result) = result {
|
||||
if let Err(err) = flatten_join_result(result) {
|
||||
errors.push(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_upload_folder_result(
|
||||
result: Result<Result<UploadFolderTaskResult, anyhow::Error>, tokio::task::JoinError>,
|
||||
folder_tasks: &mut JoinSet<Result<UploadFolderTaskResult, anyhow::Error>>,
|
||||
upload_tasks: &mut JoinSet<Result<(), anyhow::Error>>,
|
||||
errors: &mut Vec<String>,
|
||||
conn: Arc<MDRSConnection>,
|
||||
limiter: ApiRequestLimiter,
|
||||
skip_if_exists: bool,
|
||||
) {
|
||||
match flatten_join_result(result) {
|
||||
Ok(task_result) => {
|
||||
for (local_dir, remote_id) in task_result.child_folders {
|
||||
spawn_upload_folder_task(
|
||||
folder_tasks,
|
||||
conn.clone(),
|
||||
limiter.clone(),
|
||||
local_dir,
|
||||
remote_id,
|
||||
skip_if_exists,
|
||||
);
|
||||
}
|
||||
for job in task_result.upload_jobs {
|
||||
spawn_upload_task(upload_tasks, conn.clone(), limiter.clone(), job);
|
||||
}
|
||||
}
|
||||
Err(err) => errors.push(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn flatten_join_result<T>(
|
||||
result: Result<Result<T, anyhow::Error>, tokio::task::JoinError>,
|
||||
) -> Result<T, anyhow::Error> {
|
||||
match result {
|
||||
Ok(inner) => inner,
|
||||
Err(err) => Err(anyhow!("Task join failed: {}", err)),
|
||||
}
|
||||
}
|
||||
|
||||
+1
-23
@@ -1,27 +1,5 @@
|
||||
use serde::Deserialize;
|
||||
use std::fs;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CacheUser {
|
||||
username: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct WhoamiCache {
|
||||
user: Option<CacheUser>,
|
||||
}
|
||||
|
||||
pub async fn whoami(remote: &str) -> Result<(), anyhow::Error> {
|
||||
let cache_path = crate::settings::SETTINGS
|
||||
.config_dirname
|
||||
.join("cache")
|
||||
.join(format!("{}.json", remote));
|
||||
if !cache_path.exists() {
|
||||
println!("(Anonymous)");
|
||||
return Ok(());
|
||||
}
|
||||
let data = fs::read_to_string(&cache_path)?;
|
||||
match serde_json::from_str::<WhoamiCache>(&data) {
|
||||
match crate::cache::load_cache(remote) {
|
||||
Ok(cache) => match cache.user {
|
||||
Some(user) => println!("{}", user.username),
|
||||
None => println!("(Anonymous)"),
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
|
||||
use reqwest::{Client, Response};
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
fn build_user_agent() -> String {
|
||||
let info = os_info::get();
|
||||
@@ -36,6 +38,27 @@ pub struct MDRSConnection {
|
||||
pub token: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiRequestLimiter {
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl ApiRequestLimiter {
|
||||
pub fn new(limit: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(Semaphore::new(limit.max(1))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, anyhow::Error> {
|
||||
self.semaphore
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("API request limiter was closed."))
|
||||
}
|
||||
}
|
||||
|
||||
impl MDRSConnection {
|
||||
pub fn new(url: &str) -> Self {
|
||||
MDRSConnection {
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Nested folder information returned inside a DOI response.
|
||||
/// The DOI endpoint only returns the folder `id`; `laboratory_id` must be
|
||||
/// obtained by subsequently calling the folder retrieve endpoint.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct DoiFolderRef {
|
||||
pub id: String,
|
||||
}
|
||||
|
||||
/// Response from GET v3/doi/{id}/
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct DoiResponse {
|
||||
/// The internal DOI suffix ID (e.g. "20260429-001") returned as a string.
|
||||
pub id: String,
|
||||
pub doi: String,
|
||||
pub folder: DoiFolderRef,
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
// Model definitions (User, Laboratory, File, Folder, etc.)
|
||||
|
||||
pub mod doi;
|
||||
pub mod file;
|
||||
pub mod folder;
|
||||
pub mod laboratory;
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ pub struct Settings {
|
||||
/// Base directory for config and cache files.
|
||||
/// Controlled by `MDRS_CLIENT_CONFIG_DIRNAME` env var (default: `~/.mdrs-client`).
|
||||
pub config_dirname: std::path::PathBuf,
|
||||
/// Maximum number of concurrent upload/download workers.
|
||||
/// Maximum number of concurrent MDRS API requests used by upload/download.
|
||||
/// Controlled by `MDRS_CLIENT_CONCURRENT` env var (default: 10).
|
||||
pub concurrent: usize,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user