3f2ca938bd
Use a shared API request limiter across recursive upload and download traversal so folder detail fetches, file listings, folder auth, and transfers can run concurrently under one budget. Refactor the traversal loops into task-driven pipelines while preserving skip-if-exists, excludes, cleanup, and current output behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
220 lines
6.3 KiB
Rust
220 lines
6.3 KiB
Rust
use reqwest::header::{ACCEPT, AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
|
|
use reqwest::{Client, Response};
|
|
use serde::Serialize;
|
|
use std::sync::Arc;
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
|
|
|
fn build_user_agent() -> String {
|
|
let info = os_info::get();
|
|
let mut parts = vec![info.os_type().to_string()];
|
|
if *info.version() != os_info::Version::Unknown {
|
|
parts.push(info.version().to_string());
|
|
}
|
|
if let Some(codename) = info.codename() {
|
|
parts.push(codename.to_string());
|
|
}
|
|
if let Some(edition) = info.edition() {
|
|
parts.push(edition.to_string());
|
|
}
|
|
let arch = info
|
|
.architecture()
|
|
.unwrap_or(std::env::consts::ARCH)
|
|
.to_string();
|
|
parts.push(arch);
|
|
format!(
|
|
"MdrsClient/{} (Rust {} - {})",
|
|
env!("CARGO_PKG_VERSION"),
|
|
env!("RUSTC_VERSION"),
|
|
parts.join(" ")
|
|
)
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
/// HTTP transport layer for MDRS API calls.
|
|
pub struct MDRSConnection {
|
|
pub remote: Option<String>,
|
|
pub url: String,
|
|
pub client: Client,
|
|
pub token: Option<String>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct ApiRequestLimiter {
|
|
semaphore: Arc<Semaphore>,
|
|
}
|
|
|
|
impl ApiRequestLimiter {
|
|
pub fn new(limit: usize) -> Self {
|
|
Self {
|
|
semaphore: Arc::new(Semaphore::new(limit.max(1))),
|
|
}
|
|
}
|
|
|
|
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, anyhow::Error> {
|
|
self.semaphore
|
|
.clone()
|
|
.acquire_owned()
|
|
.await
|
|
.map_err(|_| anyhow::anyhow!("API request limiter was closed."))
|
|
}
|
|
}
|
|
|
|
impl MDRSConnection {
|
|
pub fn new(url: &str) -> Self {
|
|
MDRSConnection {
|
|
remote: None,
|
|
url: url.to_string(),
|
|
client: Client::new(),
|
|
token: None,
|
|
}
|
|
}
|
|
|
|
pub fn with_remote(mut self, remote: &str) -> Self {
|
|
self.remote = Some(remote.to_string());
|
|
self
|
|
}
|
|
|
|
/// Create a new connection that shares the HTTP client (and its connection
|
|
/// pool) with the receiver but uses a fresh access token. Useful for
|
|
/// spawning per-task connections without allocating a new connection pool
|
|
/// for every concurrent task.
|
|
///
|
|
/// `reqwest::Client` wraps an internal `Arc`; cloning it is cheap and
|
|
/// keeps the shared pool intact.
|
|
pub fn with_token(&self, access_token: String) -> Self {
|
|
MDRSConnection {
|
|
remote: self.remote.clone(),
|
|
url: self.url.clone(),
|
|
client: self.client.clone(),
|
|
token: Some(access_token),
|
|
}
|
|
}
|
|
|
|
async fn connection_with_fresh_token(&self) -> Result<Self, anyhow::Error> {
|
|
match (&self.remote, &self.token) {
|
|
(Some(remote), Some(_)) => {
|
|
let cache = crate::cache::load_cache_with_token_refresh(remote).await?;
|
|
Ok(self.with_token(cache.token.access))
|
|
}
|
|
_ => Ok(self.clone()),
|
|
}
|
|
}
|
|
|
|
pub fn build_url(&self, path: &str) -> String {
|
|
format!("{}/{}", self.url.trim_end_matches('/'), path)
|
|
}
|
|
|
|
pub fn prepare_headers(&self) -> HeaderMap {
|
|
let mut headers = HeaderMap::new();
|
|
headers.insert(
|
|
USER_AGENT,
|
|
HeaderValue::from_str(&build_user_agent())
|
|
.expect("user-agent string contains invalid header characters"),
|
|
);
|
|
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
|
if let Some(token) = &self.token {
|
|
headers.insert(
|
|
AUTHORIZATION,
|
|
HeaderValue::from_str(&format!("Bearer {}", token))
|
|
.expect("token contains invalid header characters"),
|
|
);
|
|
}
|
|
headers
|
|
}
|
|
|
|
pub async fn get(&self, path: &str) -> Result<Response, anyhow::Error> {
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.get(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn get_with_query<Q>(&self, path: &str, query: &Q) -> Result<Response, anyhow::Error>
|
|
where
|
|
Q: Serialize + ?Sized,
|
|
{
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.get(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.query(query)
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn get_url(&self, url: &str) -> Result<Response, anyhow::Error> {
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.get(if url.starts_with("http") {
|
|
url.to_string()
|
|
} else {
|
|
conn.build_url(url)
|
|
})
|
|
.headers(conn.prepare_headers())
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn post_json<B>(&self, path: &str, body: &B) -> Result<Response, anyhow::Error>
|
|
where
|
|
B: Serialize + ?Sized,
|
|
{
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.post(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.json(body)
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn post_multipart(
|
|
&self,
|
|
path: &str,
|
|
form: reqwest::multipart::Form,
|
|
) -> Result<Response, anyhow::Error> {
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.post(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.multipart(form)
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn delete(&self, path: &str) -> Result<Response, anyhow::Error> {
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.delete(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.send()
|
|
.await?)
|
|
}
|
|
|
|
pub async fn delete_with_query<Q>(
|
|
&self,
|
|
path: &str,
|
|
query: &Q,
|
|
) -> Result<Response, anyhow::Error>
|
|
where
|
|
Q: Serialize + ?Sized,
|
|
{
|
|
let conn = self.connection_with_fresh_token().await?;
|
|
Ok(conn
|
|
.client
|
|
.delete(conn.build_url(path))
|
|
.headers(conn.prepare_headers())
|
|
.query(query)
|
|
.send()
|
|
.await?)
|
|
}
|
|
}
|