From d12c174a8b3e5441e0558125b3b3cf32593e3976 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Fri, 26 Mar 2021 01:17:12 +0100 Subject: [PATCH] Improve terminal display output - Logs are printed at the top - Status updates are printed at the bottom and are updated in-place - Removed the redundant code for `download_one` --- Cargo.lock | 44 ++++++++- Cargo.toml | 3 +- src/dlreport.rs | 171 +++++++++++++++++++++++++++++++- src/download.rs | 23 +++-- src/main.rs | 253 ++++++++++-------------------------------------- 5 files changed, 279 insertions(+), 215 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc9a4dd..e60450d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,6 +71,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + [[package]] name = "clap" version = "2.33.3" @@ -140,7 +153,7 @@ dependencies = [ name = "fdl" version = "0.1.0" dependencies = [ - "bytes", + "chrono", "clap", "crossterm", "futures", @@ -537,6 +550,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -967,6 +999,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "tinyvec" version = "1.1.1" diff --git a/Cargo.toml b/Cargo.toml index 43c9922..8d8f271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ reqwest = { version = "0.11.2", features = [ "stream" ] } futures = "0.3.12" percent-encoding = "2.1.0" regex = "1.4.3" -bytes = "1.0.1" crossterm = "0.19.0" clap = "2.33.3" -#futures-util = "0.3.13" +chrono = "0.4" diff --git a/src/dlreport.rs b/src/dlreport.rs index fe2ba56..4f144a0 100644 --- a/src/dlreport.rs +++ b/src/dlreport.rs @@ -1,5 +1,13 @@ +use std::collections::{ HashMap, VecDeque }; +use std::time::SystemTime; +use std::io::stdout; + use tokio::sync::mpsc; +use crossterm::cursor::{ MoveUp }; +use crossterm::execute; +use crossterm::terminal::{ Clear, ClearType }; + use crate::errors::*; @@ -10,12 +18,13 @@ pub enum DlStatus { filename: String }, Update { - speed_mbps: f64, + speed_mbps: f32, bytes_curr: u64 }, Done { duration_ms: u64 - } + }, + Message(String) } #[derive(Clone, Debug)] @@ -46,3 +55,161 @@ impl DlReporter { ).map_err(|e| e.into()) } } + + +struct InfoHolder { + filename: String, + total_size: u64, + progress: u64, + speed_mbps: f32 +} + +impl InfoHolder { + + fn new(filename: String, total_size: u64) -> InfoHolder { + InfoHolder { + filename, + total_size, + progress: 0, + speed_mbps: 0.0 + } + } + +} + +fn print_accumulated_report(statuses: & HashMap, msg_queue: &mut VecDeque, moved_lines: u16) -> ResBE { + let mut dl_speed_sum = 0.0; + + execute!( + stdout(), + crossterm::cursor::Hide, + MoveUp(moved_lines) + )?; + + for msg in msg_queue.drain(..) { + + let ct_now = chrono::Local::now(); + + print!("{} > {}", ct_now.format("%H:%M:%S"), msg); + execute!( + stdout(), + Clear(ClearType::UntilNewLine) + )?; + println!(); + } + + print!("----------------------------------------"); + execute!( + stdout(), + Clear(ClearType::UntilNewLine) + )?; + println!(); + + for (_k, v) in statuses { + + let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0; + + print!("Status: {:6.2} mb/s {:5.2}% completed '{}'", v.speed_mbps, percent_complete, v.filename); + + execute!( + stdout(), + Clear(ClearType::UntilNewLine) + )?; + + println!(""); + + dl_speed_sum += v.speed_mbps; + } + + println!(); + + if statuses.len() != 0 { + print!(" =>> Accumulated download speed: {:6.2} mb/s", dl_speed_sum); + } + + execute!( + stdout(), + Clear(ClearType::UntilNewLine) + )?; + println!(""); + + execute!( + stdout(), + crossterm::cursor::Show + )?; + + // Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated + Ok(statuses.len() as u16 + 3) +} + +pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver) -> ResBE<()> { + + let mut statuses: HashMap = HashMap::new(); + let mut moved_lines = 0; + let mut msg_queue = VecDeque::new(); + + let mut t_last = SystemTime::now(); + + while let Some(update) = receiver.recv().await { + match update.status { + + DlStatus::Init { + bytes_total, + filename + } => { + + msg_queue.push_back(format!("Starting download for file '{}'", &filename)); + statuses.insert(update.id, InfoHolder::new(filename, bytes_total)); + + }, + DlStatus::Update { + speed_mbps, + bytes_curr + } => { + + // Scope the reference to prevent borrowing conflict later + { + let s = &mut statuses.get_mut(&update.id).unwrap(); + s.progress = bytes_curr; + s.speed_mbps = speed_mbps; + } + + if t_last.elapsed().unwrap().as_millis() > 500 { + moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?; + + t_last = SystemTime::now(); + } + + }, + DlStatus::Done { + duration_ms + } => { + + msg_queue.push_back(format!( + "Finished downloading '{}' with {:.2} mb in {:.2} seconds", + &statuses.get(&update.id).unwrap().filename, + (statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0), + (duration_ms as f32 / 1_000.0) + )); + + statuses.remove(&update.id); + + }, + DlStatus::Message(msg) => { + msg_queue.push_back(msg); + } + + } + } + + print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?; + + execute!( + stdout(), + MoveUp(2) + )?; + + println!("All done!"); + + Ok(()) +} diff --git a/src/download.rs b/src/download.rs index 32a82a1..ff85096 100644 --- a/src/download.rs +++ b/src/download.rs @@ -12,7 +12,7 @@ use crate::dlreport::*; struct RollingAverage { index: usize, - data: Vec + data: Vec } impl RollingAverage { @@ -24,16 +24,16 @@ impl RollingAverage { } } - fn value(&self) -> f64 { + fn value(&self) -> f32 { if self.data.len() == 0 { 0.0 } else { - let sum: f64 = self.data.iter().sum(); - sum / self.data.len() as f64 + let sum: f32 = self.data.iter().sum(); + sum / self.data.len() as f32 } } - fn add(&mut self, val: f64) { + fn add(&mut self, val: f32) { if self.data.capacity() == self.data.len() { self.data[self.index] = val; @@ -159,7 +159,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte let mut t_last_speed = SystemTime::now(); let mut last_bytecount = 0; - let mut average_speed = RollingAverage::new(5); + let mut average_speed = RollingAverage::new(10); // Read data from server as long as new data is available while let Some(chunk) = resp.chunk().await? { @@ -181,7 +181,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte // Update rolling average average_speed.add( - (last_bytecount as f64) / (1000.0 * t_elapsed as f64) + (last_bytecount as f32) / (1_000.0 * t_elapsed as f32) ); speed_mbps = average_speed.value(); @@ -201,7 +201,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte } // Ensure that IO is completed - ofile.flush().await?; + //ofile.flush().await?; let duration_ms = t_start.elapsed()?.as_millis() as u64; @@ -267,7 +267,7 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter })?; let mut update_counter = 0; - let mut dl_speeds = vec![0.0f64; numparal as usize]; + let mut dl_speeds = vec![0.0_f32; numparal as usize]; let mut progresses = vec![0; numparal as usize]; while let Some(update) = rx.recv().await { @@ -309,7 +309,10 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter dl_speeds[update.id as usize] = 0.0; - } + }, + + // Just forwared everything else to the calling receiver + _ => rep.send(update.status)? } } diff --git a/src/main.rs b/src/main.rs index 60ada6b..fb6e3d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,7 @@ use std::process::exit; use clap::{ App, Arg, ArgGroup, crate_version }; use tokio::sync::mpsc; use futures::future::join_all; -use std::time::SystemTime; use std::io::BufRead; -use std::collections::HashMap; use dlreport::{ DlReport, DlStatus, DlReporter }; use errors::ResBE; @@ -38,6 +36,16 @@ async fn main() -> ResBE<()> { .takes_value(true) .help("Specify the number concurrent downloads") ) + .arg( + Arg::with_name("boost") + .short("b") + .long("boost") + .value_name("CONNECTIONS PER FILE") + .takes_value(true) + .help("Specify the number connections per single downloads. \ + Files started with boost can't be continued. \ + NOTE: This will likely cause IO bottlenecks on HDDs") + ) .arg( Arg::with_name("zippyshare") .short("z") @@ -98,10 +106,23 @@ async fn main() -> ResBE<()> { exit(1); } }; + + + let boost = match arguments.value_of("boost") { + Some(it) => it, + None => "1" + }; + + let boost: i32 = match boost.parse() { + Ok(it) => it, + Err(_) => { + eprintln!("Invalid value for boost: {}", numparal); + exit(1); + } + }; let is_zippy = arguments.is_present("zippyshare"); - if arguments.is_present("listfile") { let listfile = arguments.value_of("listfile").unwrap(); @@ -133,7 +154,7 @@ async fn main() -> ResBE<()> { urls = zippy_urls; } - download_multiple(urls, outdir, numparal).await?; + download_multiple(urls, outdir, numparal, boost).await?; } else if arguments.is_present("download") { @@ -151,7 +172,13 @@ async fn main() -> ResBE<()> { url.to_string() }; - download_one(&url, outdir, numparal).await?; + let numparal = if boost != 1 { + boost + } else { + numparal + }; + + download_multiple(vec![url], outdir, 1, numparal).await?; } else if arguments.is_present("resolve") { @@ -176,107 +203,7 @@ async fn main() -> ResBE<()> { Ok(()) } - -async fn download_one(url: &str, outdir: &str, numparal: i32) -> ResBE<()> { - let outdir = Path::new(outdir); - - if !outdir.exists() { - std::fs::create_dir_all(outdir)?; - } - - - let file_name = download::url_to_filename(url); - let into_file = outdir.join(Path::new(&file_name)); - let into_file = into_file.to_str().unwrap().to_string(); - let path_into_file = Path::new(&into_file); - - // If file with same name is present locally, check filesize - if path_into_file.exists() { - let (filesize, _) = download::http_get_filesize_and_range_support(&url).await?; - let local_filesize = std::fs::metadata(path_into_file)?.len(); - - if filesize == local_filesize { - println!("Skipping file '{}': already present", &file_name); - return Ok(()); - } else { - println!("Replacing file '{}': present but not completed", &file_name); - } - } - - // Create com channel to get feedback on download progress - let (tx, mut rx) = mpsc::unbounded_channel::(); - - // Start download nonblocking - let url = url.to_string(); - let jh_download = tokio::spawn(async move { - // Create reporter with id 0 since there is only one anyways - let rep = DlReporter::new(0, tx); - - if numparal == 1 { - if let Err(e) = download::download_feedback(&url, &into_file, rep).await { - eprintln!("Error while downloading"); - eprintln!("{}", e); - } - } else { - if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, numparal).await { - eprintln!("Error while downloading"); - eprintln!("{}", e); - } - } - }); - - - let mut t_last = SystemTime::UNIX_EPOCH; - - let mut filesize = 0; - - // Handle download status updates until all transmitters are closed - // this happens when the download is completed - while let Some(update) = rx.recv().await { - match update.status { - - DlStatus::Init { - bytes_total, - filename - } => { - - println!("Starting download for file '{}'", &filename); - filesize = bytes_total; - - }, - DlStatus::Update { - speed_mbps, - bytes_curr - } => { - - // Print update every second, otherwise ignore the updates - if t_last.elapsed()?.as_millis() > 1000 { - let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0; - println!("Status: {:6.2} mb/s {:5.2}% completed", speed_mbps, percent_complete); - - t_last = SystemTime::now(); - } - - }, - DlStatus::Done { - duration_ms - } => { - - println!("Status: 100% completed"); - println!("Download took {} seconds", (duration_ms / 1000)); - - } - - } - } - - // Await the download just to make sure - jh_download.await?; - - Ok(()) -} - -async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> ResBE<()> { +async fn download_multiple(urls: Vec, outdir: &str, numparal: i32, boost: i32) -> ResBE<()> { let outdir = Path::new(outdir); if !outdir.exists() { @@ -285,7 +212,7 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> Re let mut joiners = Vec::new(); - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, rx) = mpsc::unbounded_channel::(); for offset in 0..numparal { @@ -303,12 +230,16 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> Re for (i, url) in urls.iter().enumerate() { + let tx = tx.clone(); + // Recalculated index in the main url vector, used as id let global_url_index = i as i32 * numparal + offset; + let rep = DlReporter::new(global_url_index, tx); + let file_name = download::url_to_filename(&url); - let into_file = outdir.join(Path::new(&file_name)); - let into_file = into_file.to_str().unwrap().to_string(); + let into_file = outdir.join(Path::new(&file_name)) + .to_str().unwrap().to_string(); let path_into_file = Path::new(&into_file); // If file with same name is present locally, check filesize @@ -317,18 +248,23 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> Re let local_filesize = std::fs::metadata(path_into_file).unwrap().len(); if filesize == local_filesize { - println!("Skipping file '{}': already present", &file_name); + rep.send(DlStatus::Message(format!("Skipping file '{}': already present", &file_name))).unwrap(); continue; } else { - println!("Replacing file '{}': present but not completed", &file_name); + rep.send(DlStatus::Message(format!("Replacing file '{}': present but not completed", &file_name))).unwrap(); } } - let rep = DlReporter::new(global_url_index, tx.clone()); - - if let Err(e) = download::download_feedback(&url, &into_file, rep).await { - eprintln!("Error while downloading '{}'", file_name); - eprintln!("{}", e); + if boost == 1 { + if let Err(e) = download::download_feedback(&url, &into_file, rep).await { + eprintln!("Error while downloading '{}'", file_name); + eprintln!("{}", e); + } + } else { + if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, boost).await { + eprintln!("Error while downloading '{}'", file_name); + eprintln!("{}", e); + } } } @@ -338,90 +274,7 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> Re drop(tx); - // filename, total size bytes, current size bytes, download speed mbps - let mut statuses: HashMap = HashMap::new(); - let mut t_last = SystemTime::now(); - - while let Some(update) = rx.recv().await { - match update.status { - - DlStatus::Init { - bytes_total, - filename - } => { - - println!("Starting download for file '{}'", &filename); - statuses.insert(update.id, (filename, bytes_total, 0, 0.0)); - - }, - DlStatus::Update { - speed_mbps, - bytes_curr - } => { - - // Scope the reference to prevent borrowing conflict later - { - let s = &mut statuses.get_mut(&update.id).unwrap(); - s.2 = bytes_curr; - s.3 = speed_mbps; - } - - if t_last.elapsed().unwrap().as_millis() > 500 { - - let mut dl_speed_sum = 0.0; - - for (_k, v) in &statuses { - let filename = &v.0; - let filesize = v.1; - let bytes_curr = v.2; - let speed_mbps = v.3; - - let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0; - - - crossterm::execute!( - std::io::stdout(), - crossterm::terminal::Clear(crossterm::terminal::ClearType::CurrentLine) - ); - - println!("Status: {:6.2} mb/s {:5.2}% completed '{}'", speed_mbps, percent_complete, filename); - - dl_speed_sum += speed_mbps; - } - - crossterm::execute!( - std::io::stdout(), - crossterm::terminal::Clear(crossterm::terminal::ClearType::CurrentLine) - ); - println!("Accumulated download speed: {:6.2} mb/s\n", dl_speed_sum); - - crossterm::execute!( - std::io::stdout(), - crossterm::cursor::MoveUp(statuses.len() as u16 + 2) - ); - - t_last = SystemTime::now(); - - } - - }, - DlStatus::Done { - duration_ms - } => { - - println!( - "Status: 100% completed '{}'\nDownload took {} seconds", - &statuses.get(&update.id).unwrap().0, - (duration_ms / 1000) - ); - - statuses.remove(&update.id); - - } - - } - } - + dlreport::watch_and_print_reports(rx).await?; join_all(joiners).await;