From e2c4d3572b599e9a44dd735a85ce5ef7c5dac7f4 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Thu, 31 Mar 2022 23:13:43 +0200 Subject: [PATCH] More refactoring for dlreport --- src/args.rs | 20 ++-- src/clireporter.rs | 208 +++++++++++++++++++++++++++++++++++++ src/dlreport.rs | 250 ++++++++++----------------------------------- src/download.rs | 58 +---------- src/main.rs | 6 +- 5 files changed, 281 insertions(+), 261 deletions(-) create mode 100644 src/clireporter.rs diff --git a/src/args.rs b/src/args.rs index 4ef1463..e4485ee 100644 --- a/src/args.rs +++ b/src/args.rs @@ -15,28 +15,32 @@ pub struct CLIArgs { long = "outdir", value_name = "OUTPUT DIR", default_value = "./", - help = "Set the output directory. The directory will be created if it doesn't exit yet", + help = "Set the output directory in which the downloads will be stored. \ + The directory will be created if it doesn't exit yet", )] pub outdir: PathBuf, #[clap( short = 'n', long = "num-files", - value_name = "NUMBER OF CONCURRENT FILE DOWNLOADS", + value_name = "PARALLEL DOWNLOADS", default_value = "1", - help = "Specify the number of concurrent downloads", + help = "Specify the number of files from that should be downloaded in parallel. Increasing \ + this number will increase the total download speed but won't improve the download speed \ + for individual files", )] pub file_count: NonZeroU32, #[clap( short = 'c', long = "connections", - value_name = "NUMBER OF CONCURRENT CONNECTIONS", + value_name = "CONNECTIONS PER FILE", default_value = "1", - help = "The number concurrent connections per file download. \ - Downloads might fail when the number of connections is too high. \ - Files started with multiple connections currently can't be continued. \ - NOTE: This will likely cause IO bottlenecks on HDDs", + help = "The number concurrent connections per file download. Increasing this number will \ + increase the download speed of individual files if supported by the server but \ + setting this number too high may cause the download to fail. \n\ + NOTE: This mode will write cause random writes and for that reason won't work on HDDs. \ + WARNING: Files started with multiple connections currently can't be continued.", )] pub conn_count: NonZeroU32, diff --git a/src/clireporter.rs b/src/clireporter.rs new file mode 100644 index 0000000..513cc59 --- /dev/null +++ b/src/clireporter.rs @@ -0,0 +1,208 @@ +use std::collections::{HashMap, VecDeque}; +use std::io::stdout; +use std::time::SystemTime; + +use anyhow::Result; +use crossterm::cursor::MoveToPreviousLine; +use crossterm::execute; +use crossterm::style::Print; +use crossterm::terminal::{Clear, ClearType}; +use tokio::sync::mpsc; + +use crate::dlreport::{DlReport, DlStatus, InfoHolder}; + +fn print_accumulated_report( + statuses: &HashMap, + msg_queue: &mut VecDeque, + moved_lines: u16, + file_count_completed: i32, + file_count_total: i32, +) -> Result { + let mut dl_speed_sum = 0.0; + + execute!( + stdout(), + crossterm::cursor::Hide, + MoveToPreviousLine(moved_lines) + )?; + + for msg in msg_queue.drain(..) { + let ct_now = chrono::Local::now(); + + execute!( + stdout(), + Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)), + Clear(ClearType::UntilNewLine), + Print("\n") + )?; + } + + execute!( + stdout(), + Print("----------------------------------------".to_string()), + Clear(ClearType::UntilNewLine), + Print("\n") + )?; + + for v in statuses.values() { + let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0; + + execute!( + stdout(), + Print(format!( + "Status: {:6.2} mb/s {:5.2}% completed '{}'", + v.speed_mbps, percent_complete, v.filename + )), + Clear(ClearType::UntilNewLine), + Print("\n") + )?; + + dl_speed_sum += v.speed_mbps; + } + + let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0; + + execute!( + stdout(), + Clear(ClearType::CurrentLine), + Print("\n"), + Print(format!( + " =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%", + dl_speed_sum, file_count_completed, file_count_total, file_percent_completed + )), + Clear(ClearType::UntilNewLine), + Print("\n"), + Clear(ClearType::FromCursorDown), + crossterm::cursor::Show + )?; + + // Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated + Ok(statuses.len() as u16 + 3) +} + +/// Receive download reports from the provided receiver and print them to stdout using dynamic +/// refreshes of the terminal. This will block until all senders are closed. +pub async fn cli_print_reports( + mut receiver: mpsc::UnboundedReceiver, + file_count_total: i32, +) -> Result<()> { + let mut statuses: HashMap = HashMap::new(); + let mut moved_lines = 0; + let mut msg_queue = VecDeque::new(); + + let mut t_last = SystemTime::now(); + + let mut file_count_completed = 0; + let mut file_count_failed = 0; + let mut file_count_done = 0; + + while let Some(update) = receiver.recv().await { + match update.status { + DlStatus::Init { + bytes_total, + filename, + } => { + msg_queue.push_back(format!("Starting download for file '{}'", &filename)); + statuses.insert(update.id, InfoHolder::new(filename, bytes_total)); + + moved_lines = print_accumulated_report( + &statuses, + &mut msg_queue, + moved_lines, + file_count_done, + file_count_total, + )?; + } + DlStatus::Update { + speed_mbps, + bytes_curr, + } => { + // Scope the reference to prevent borrowing conflict later + { + let s = &mut statuses.get_mut(&update.id).unwrap(); + s.progress = bytes_curr; + s.speed_mbps = speed_mbps; + } + + if t_last.elapsed().unwrap().as_millis() > 500 { + moved_lines = print_accumulated_report( + &statuses, + &mut msg_queue, + moved_lines, + file_count_done, + file_count_total, + )?; + + t_last = SystemTime::now(); + } + } + DlStatus::Done { duration_ms } => { + msg_queue.push_back(format!( + "Finished downloading '{}' with {:.2} mb in {:.2} seconds", + &statuses.get(&update.id).unwrap().filename, + (statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0), + (duration_ms as f32 / 1_000.0) + )); + + statuses.remove(&update.id); + + file_count_completed += 1; + file_count_done += 1; + } + DlStatus::DoneErr { filename } => { + msg_queue.push_back(format!("Error: Download failed: '{}'", filename)); + + // Don't care if it exists, just make sure it is gone + statuses.remove(&update.id); + + // Refresh display + moved_lines = print_accumulated_report( + &statuses, + &mut msg_queue, + moved_lines, + file_count_done, + file_count_total, + )?; + t_last = SystemTime::now(); + + file_count_failed += 1; + file_count_done += 1; + } + DlStatus::Message(msg) => { + msg_queue.push_back(msg); + moved_lines = print_accumulated_report( + &statuses, + &mut msg_queue, + moved_lines, + file_count_done, + file_count_total, + )?; + t_last = SystemTime::now(); + } + DlStatus::Skipped => { + file_count_completed += 1; + file_count_done += 1; + } + } + } + + print_accumulated_report( + &statuses, + &mut msg_queue, + moved_lines, + file_count_done, + file_count_total, + )?; + + execute!( + stdout(), + MoveToPreviousLine(2), + Print(format!( + "All done! {}/{} completed, {} failed\n", + file_count_completed, file_count_total, file_count_failed + )), + Clear(ClearType::FromCursorDown) + )?; + + Ok(()) +} diff --git a/src/dlreport.rs b/src/dlreport.rs index d9684c0..46c487c 100644 --- a/src/dlreport.rs +++ b/src/dlreport.rs @@ -1,13 +1,8 @@ -use std::collections::{HashMap, VecDeque}; -use std::io::stdout; -use std::time::SystemTime; +use std::{collections::HashMap, time::SystemTime}; -use anyhow::Result; -use crossterm::cursor::MoveToPreviousLine; -use crossterm::execute; -use crossterm::style::Print; -use crossterm::terminal::{Clear, ClearType}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +use crate::misc::RollingAverage; #[derive(Clone, Debug)] pub enum DlStatus { @@ -87,15 +82,15 @@ macro_rules! report_msg { }; } -struct InfoHolder { - filename: String, - total_size: u64, - progress: u64, - speed_mbps: f32, +pub struct InfoHolder { + pub filename: String, + pub total_size: u64, + pub progress: u64, + pub speed_mbps: f32, } impl InfoHolder { - fn new(filename: String, total_size: u64) -> InfoHolder { + pub fn new(filename: String, total_size: u64) -> InfoHolder { InfoHolder { filename, total_size, @@ -105,196 +100,59 @@ impl InfoHolder { } } -fn print_accumulated_report( - statuses: &HashMap, - msg_queue: &mut VecDeque, - moved_lines: u16, - file_count_completed: i32, - file_count_total: i32, -) -> Result { - let mut dl_speed_sum = 0.0; - - execute!( - stdout(), - crossterm::cursor::Hide, - MoveToPreviousLine(moved_lines) - )?; - - for msg in msg_queue.drain(..) { - let ct_now = chrono::Local::now(); - - execute!( - stdout(), - Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)), - Clear(ClearType::UntilNewLine), - Print("\n") - )?; - } - - execute!( - stdout(), - Print("----------------------------------------".to_string()), - Clear(ClearType::UntilNewLine), - Print("\n") - )?; - - for v in statuses.values() { - let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0; - - execute!( - stdout(), - Print(format!( - "Status: {:6.2} mb/s {:5.2}% completed '{}'", - v.speed_mbps, percent_complete, v.filename - )), - Clear(ClearType::UntilNewLine), - Print("\n") - )?; - - dl_speed_sum += v.speed_mbps; - } - - let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0; - - execute!( - stdout(), - Clear(ClearType::CurrentLine), - Print("\n"), - Print(format!( - " =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%", - dl_speed_sum, file_count_completed, file_count_total, file_percent_completed - )), - Clear(ClearType::UntilNewLine), - Print("\n"), - Clear(ClearType::FromCursorDown), - crossterm::cursor::Show - )?; - - // Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated - Ok(statuses.len() as u16 + 3) +pub struct DlReportAccumulator { + parent: DlReporter, + rec: UnboundedReceiver, } -pub async fn watch_and_print_reports( - mut receiver: mpsc::UnboundedReceiver, - file_count_total: i32, -) -> Result<()> { - let mut statuses: HashMap = HashMap::new(); - let mut moved_lines = 0; - let mut msg_queue = VecDeque::new(); +impl DlReportAccumulator { + pub fn new(parent: DlReporter) -> (DlReportAccumulator, UnboundedSender) { + let (tx, rec) = mpsc::unbounded_channel(); + (DlReportAccumulator { parent, rec }, tx) + } - let mut t_last = SystemTime::now(); + pub async fn accumulate(mut self) { + let mut progresses: HashMap = HashMap::new(); - let mut file_count_completed = 0; - let mut file_count_failed = 0; - let mut file_count_done = 0; + let mut progress_last: u64 = 0; - while let Some(update) = receiver.recv().await { - match update.status { - DlStatus::Init { - bytes_total, - filename, - } => { - msg_queue.push_back(format!("Starting download for file '{}'", &filename)); - statuses.insert(update.id, InfoHolder::new(filename, bytes_total)); + let mut t_last = SystemTime::now(); - moved_lines = print_accumulated_report( - &statuses, - &mut msg_queue, - moved_lines, - file_count_done, - file_count_total, - )?; - } - DlStatus::Update { - speed_mbps, - bytes_curr, - } => { - // Scope the reference to prevent borrowing conflict later - { - let s = &mut statuses.get_mut(&update.id).unwrap(); - s.progress = bytes_curr; - s.speed_mbps = speed_mbps; + let mut average_speed = RollingAverage::new(10); + + while let Some(update) = self.rec.recv().await { + match update.status { + DlStatus::Init { + bytes_total: _, + filename: _, + } => {} + DlStatus::Update { + speed_mbps: _, + bytes_curr, + } => { + *progresses.entry(update.id).or_insert(0) = bytes_curr; + + let progress_curr = progresses.values().sum(); + let progress_delta = progress_curr - progress_last; + let t_elapsed = t_last.elapsed().unwrap().as_secs_f64(); + + let speed_mbps = average_speed.value() as f32; + + // currently executes always, but might change + if progress_delta >= 5_000_000 { + average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed); + + progress_last = progress_curr; + t_last = SystemTime::now(); + } + + self.parent.update(speed_mbps, progress_curr); } + DlStatus::Done { duration_ms: _ } => {} - if t_last.elapsed().unwrap().as_millis() > 500 { - moved_lines = print_accumulated_report( - &statuses, - &mut msg_queue, - moved_lines, - file_count_done, - file_count_total, - )?; - - t_last = SystemTime::now(); - } - } - DlStatus::Done { duration_ms } => { - msg_queue.push_back(format!( - "Finished downloading '{}' with {:.2} mb in {:.2} seconds", - &statuses.get(&update.id).unwrap().filename, - (statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0), - (duration_ms as f32 / 1_000.0) - )); - - statuses.remove(&update.id); - - file_count_completed += 1; - file_count_done += 1; - } - DlStatus::DoneErr { filename } => { - msg_queue.push_back(format!("Error: Download failed: '{}'", filename)); - - // Don't care if it exists, just make sure it is gone - statuses.remove(&update.id); - - // Refresh display - moved_lines = print_accumulated_report( - &statuses, - &mut msg_queue, - moved_lines, - file_count_done, - file_count_total, - )?; - t_last = SystemTime::now(); - - file_count_failed += 1; - file_count_done += 1; - } - DlStatus::Message(msg) => { - msg_queue.push_back(msg); - moved_lines = print_accumulated_report( - &statuses, - &mut msg_queue, - moved_lines, - file_count_done, - file_count_total, - )?; - t_last = SystemTime::now(); - } - DlStatus::Skipped => { - file_count_completed += 1; - file_count_done += 1; + // Just forwared everything else to the calling receiver + _ => self.parent.send(update.status), } } } - - print_accumulated_report( - &statuses, - &mut msg_queue, - moved_lines, - file_count_done, - file_count_total, - )?; - - execute!( - stdout(), - MoveToPreviousLine(2), - Print(format!( - "All done! {}/{} completed, {} failed\n", - file_count_completed, file_count_total, file_count_failed - )), - Clear(ClearType::FromCursorDown) - )?; - - Ok(()) } diff --git a/src/download.rs b/src/download.rs index 042c327..bb85995 100644 --- a/src/download.rs +++ b/src/download.rs @@ -7,9 +7,8 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use percent_encoding::percent_decode_str; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; -use tokio::sync::mpsc; -use crate::dlreport::{DlReport, DlReporter, DlStatus}; +use crate::dlreport::{DlReportAccumulator, DlReporter}; use crate::errors::DlError; use crate::misc::RollingAverage; @@ -170,7 +169,7 @@ pub async fn download_feedback_multi( let mut joiners = Vec::new(); - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (rep_accum, tx) = DlReportAccumulator::new(rep.clone()); let t_start = SystemTime::now(); @@ -211,58 +210,7 @@ pub async fn download_feedback_multi( rep.init(content_length, filename.to_string()); - let rep_task = rep.clone(); - - let mut t_last = t_start; - - let manager_handle = tokio::task::spawn(async move { - let rep = rep_task; - //let mut dl_speeds = vec![0.0_f32; conn_count as usize]; - let mut progresses = vec![0; conn_count as usize]; - - let mut progress_last: u64 = 0; - - let mut average_speed = RollingAverage::new(10); - - while let Some(update) = rx.recv().await { - match update.status { - DlStatus::Init { - bytes_total: _, - filename: _, - } => {} - DlStatus::Update { - speed_mbps: _, - bytes_curr, - } => { - //dl_speeds[update.id as usize] = speed_mbps; - progresses[update.id as usize] = bytes_curr; - - let progress_curr = progresses.iter().sum(); - let progress_delta = progress_curr - progress_last; - let t_elapsed = t_last.elapsed().unwrap().as_secs_f64(); - - let speed_mbps = average_speed.value() as f32; - - // currently executes always, but might change - if progress_delta >= 5_000_000 { - average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed); - - progress_last = progress_curr; - t_last = SystemTime::now(); - } - - rep.update(speed_mbps, progress_curr); - } - DlStatus::Done { duration_ms: _ } => { - - //dl_speeds[update.id as usize] = 0.0; - } - - // Just forwared everything else to the calling receiver - _ => rep.send(update.status), - } - } - }); + let manager_handle = tokio::task::spawn(rep_accum.accumulate()); let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect(); // Validate if the tasks were successful. This will always grab the next completed diff --git a/src/main.rs b/src/main.rs index f0cf8e8..8a628ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,11 +12,13 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::Mutex; use crate::args::CLIArgs; -use crate::dlreport::{watch_and_print_reports, DlReport, DlReporter}; +use crate::clireporter::cli_print_reports; +use crate::dlreport::{DlReport, DlReporter}; use crate::download::{download_feedback, download_feedback_multi, http_file_info}; use crate::zippy::is_zippyshare_url; mod args; +mod clireporter; mod dlreport; mod download; mod errors; @@ -97,7 +99,7 @@ async fn download_multiple(args: CLIArgs, raw_urls: Vec) -> Result<()> { drop(tx); - watch_and_print_reports(rx, num_urls as i32).await?; + cli_print_reports(rx, num_urls as i32).await?; join_all(jobs).await;