diff --git a/docker-compose.yml b/docker-compose.yml index ac14fb6..d161be9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,8 @@ services: APP_ENV: development LOG: debug TZ: "Europe/Paris" - EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNTMwMjNkYjQtN2EzYS00ZTM0LTk0MWEtNjU2ZTNlNzE2NzlkIiwibWFzdGVyS2V5QjY0IjoiMUh0djdtWCtYVkJxL0IzUEV2WDlZZjlQeUdVZW5oRHlXemo5THRqNW90WT0ifQ==" + EDGE_KEY: "eyJzZXJ2ZXJVcmwiOiJodHRwOi8vbG9jYWxob3N0Ojg4ODciLCJhZ2VudElkIjoiNzM0NjU3Y2YtMGQzYy00Y2UwLTkyODQtZDJmOGYyMjI2MzgzIiwibWFzdGVyS2V5QjY0IjoiMUh0djdtWCtYVkJxL0IzUEV2WDlZZjlQeUdVZW5oRHlXemo5THRqNW90WT0ifQ==" + #CHUNK_SIZE_MB: "1" #POOLING: 1 #DATABASES_CONFIG_FILE: "config.toml" extra_hosts: diff --git a/src/settings.rs b/src/settings.rs index 053a115..039ea70 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -14,6 +14,7 @@ pub struct Settings { pub pooling: usize, pub timezone: String, pub log: String, + pub chunk_size: usize, // bytes } impl Settings { @@ -35,6 +36,18 @@ impl Settings { pooling_seconds ); } + + let chunk_size_mb = env::var("CHUNK_SIZE_MB") + .unwrap_or_else(|_| "1".to_string()) + .parse::() + .expect("CHUNK_SIZE_MB must be a valid positive integer"); + + if chunk_size_mb == 0 || chunk_size_mb > 10 { + panic!("CHUNK_SIZE_MB must be between 1 and 10 MB"); + } + + let chunk_size = chunk_size_mb * 1024 * 1024; + let tz = env::var("TZ").unwrap_or_else(|_| "UTC".to_string()); Self { @@ -49,6 +62,7 @@ impl Settings { pooling: pooling_seconds, timezone: tz, log: env::var("LOG").unwrap_or_else(|_| "info".into()), + chunk_size } } } diff --git a/src/utils/stream.rs b/src/utils/stream.rs index 52a0faf..88874d7 100644 --- a/src/utils/stream.rs +++ b/src/utils/stream.rs @@ -3,12 +3,15 @@ use anyhow::Result; use bytes::Bytes; use futures::{Stream, StreamExt}; use std::pin::Pin; -use tokio_util::io::ReaderStream; +use tokio::io::{AsyncReadExt}; +use crate::settings::CONFIG; pub struct UploadStream { pub stream: Pin> + Send>>, } + + pub async fn build_stream( file_path: &std::path::Path, encrypt: bool, @@ -25,13 +28,27 @@ pub async fn build_stream( Ok(UploadStream { stream }) } else { - let file = tokio::fs::File::open(file_path).await?; - let reader = ReaderStream::new(file); + let mut file = tokio::fs::File::open(file_path).await?; - let stream = Box::pin( - reader.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), - ); + let stream = async_stream::stream! { + let mut buffer = vec![0u8; CONFIG.chunk_size]; - Ok(UploadStream { stream }) + loop { + let n = match file.read(&mut buffer).await { + Ok(0) => break, + Ok(n) => n, + Err(e) => { + yield Err(e); + break; + } + }; + + yield Ok(Bytes::copy_from_slice(&buffer[..n])); + } + }; + + Ok(UploadStream { + stream: Box::pin(stream), + }) } } diff --git a/src/utils/tus.rs b/src/utils/tus.rs index 387d984..5444d46 100644 --- a/src/utils/tus.rs +++ b/src/utils/tus.rs @@ -3,8 +3,7 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; use tracing::{error, info}; - -const PATCH_CHUNK_SIZE: usize = 1 * 1024 * 1024; +use crate::settings::CONFIG; pub async fn upload_to_tus_stream_with_headers( encrypted_stream: S, @@ -66,7 +65,9 @@ where while let Some(chunk) = stream.next().await { let chunk = chunk.context("Stream produced IO error")?; - for sub_chunk in chunk.chunks(PATCH_CHUNK_SIZE) { + info!("Chunk: {:?}", CONFIG.chunk_size); + + for sub_chunk in chunk.chunks(CONFIG.chunk_size) { let mut patch_headers = extra_headers.clone(); patch_headers.insert("Tus-Resumable", HeaderValue::from_static("1.0.0")); patch_headers.insert(