Compare commits

...

2 Commits

Author SHA1 Message Date
e2c4d3572b More refactoring for dlreport 2022-03-31 23:13:43 +02:00
e6360153d6 More refactoring 2022-03-31 20:25:24 +02:00
7 changed files with 423 additions and 426 deletions

View File

@ -1,4 +1,5 @@
use std::{num::NonZeroU32, path::PathBuf}; use std::{num::NonZeroU32, path::PathBuf};
use clap::Parser; use clap::Parser;
#[derive(Parser, Clone, Debug)] #[derive(Parser, Clone, Debug)]
@ -14,28 +15,32 @@ pub struct CLIArgs {
long = "outdir", long = "outdir",
value_name = "OUTPUT DIR", value_name = "OUTPUT DIR",
default_value = "./", default_value = "./",
help = "Set the output directory. The directory will be created if it doesn't exit yet", help = "Set the output directory in which the downloads will be stored. \
The directory will be created if it doesn't exit yet",
)] )]
pub outdir: PathBuf, pub outdir: PathBuf,
#[clap( #[clap(
short = 'n', short = 'n',
long = "num-files", long = "num-files",
value_name = "NUMBER OF CONCURRENT FILE DOWNLOADS", value_name = "PARALLEL DOWNLOADS",
default_value = "1", default_value = "1",
help = "Specify the number of concurrent downloads", help = "Specify the number of files from that should be downloaded in parallel. Increasing \
this number will increase the total download speed but won't improve the download speed \
for individual files",
)] )]
pub file_count: NonZeroU32, pub file_count: NonZeroU32,
#[clap( #[clap(
short = 'c', short = 'c',
long = "connections", long = "connections",
value_name = "NUMBER OF CONCURRENT CONNECTIONS", value_name = "CONNECTIONS PER FILE",
default_value = "1", default_value = "1",
help = "The number concurrent connections per file download. \ help = "The number concurrent connections per file download. Increasing this number will \
Downloads might fail when the number of connections is too high. \ increase the download speed of individual files if supported by the server but \
Files started with multiple connections currently can't be continued. \ setting this number too high may cause the download to fail. \n\
NOTE: This will likely cause IO bottlenecks on HDDs", NOTE: This mode will write cause random writes and for that reason won't work on HDDs. \
WARNING: Files started with multiple connections currently can't be continued.",
)] )]
pub conn_count: NonZeroU32, pub conn_count: NonZeroU32,

208
src/clireporter.rs Normal file
View File

@ -0,0 +1,208 @@
use std::collections::{HashMap, VecDeque};
use std::io::stdout;
use std::time::SystemTime;
use anyhow::Result;
use crossterm::cursor::MoveToPreviousLine;
use crossterm::execute;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use tokio::sync::mpsc;
use crate::dlreport::{DlReport, DlStatus, InfoHolder};
fn print_accumulated_report(
statuses: &HashMap<u32, InfoHolder>,
msg_queue: &mut VecDeque<String>,
moved_lines: u16,
file_count_completed: i32,
file_count_total: i32,
) -> Result<u16> {
let mut dl_speed_sum = 0.0;
execute!(
stdout(),
crossterm::cursor::Hide,
MoveToPreviousLine(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
execute!(
stdout(),
Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
}
execute!(
stdout(),
Print("----------------------------------------".to_string()),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
for v in statuses.values() {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
execute!(
stdout(),
Print(format!(
"Status: {:6.2} mb/s {:5.2}% completed '{}'",
v.speed_mbps, percent_complete, v.filename
)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
dl_speed_sum += v.speed_mbps;
}
let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0;
execute!(
stdout(),
Clear(ClearType::CurrentLine),
Print("\n"),
Print(format!(
" =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%",
dl_speed_sum, file_count_completed, file_count_total, file_percent_completed
)),
Clear(ClearType::UntilNewLine),
Print("\n"),
Clear(ClearType::FromCursorDown),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
}
/// Receive download reports from the provided receiver and print them to stdout using dynamic
/// refreshes of the terminal. This will block until all senders are closed.
pub async fn cli_print_reports(
mut receiver: mpsc::UnboundedReceiver<DlReport>,
file_count_total: i32,
) -> Result<()> {
let mut statuses: HashMap<u32, InfoHolder> = HashMap::new();
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new();
let mut t_last = SystemTime::now();
let mut file_count_completed = 0;
let mut file_count_failed = 0;
let mut file_count_done = 0;
while let Some(update) = receiver.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename,
} => {
msg_queue.push_back(format!("Starting download for file '{}'", &filename));
statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
}
DlStatus::Update {
speed_mbps,
bytes_curr,
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
}
DlStatus::Done { duration_ms } => {
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id);
file_count_completed += 1;
file_count_done += 1;
}
DlStatus::DoneErr { filename } => {
msg_queue.push_back(format!("Error: Download failed: '{}'", filename));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
file_count_failed += 1;
file_count_done += 1;
}
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
DlStatus::Skipped => {
file_count_completed += 1;
file_count_done += 1;
}
}
}
print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
execute!(
stdout(),
MoveToPreviousLine(2),
Print(format!(
"All done! {}/{} completed, {} failed\n",
file_count_completed, file_count_total, file_count_failed
)),
Clear(ClearType::FromCursorDown)
)?;
Ok(())
}

View File

@ -1,15 +1,8 @@
use std::collections::{HashMap, VecDeque}; use std::{collections::HashMap, time::SystemTime};
use std::io::stdout;
use std::time::SystemTime;
use tokio::sync::mpsc; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crossterm::cursor::MoveToPreviousLine; use crate::misc::RollingAverage;
use crossterm::execute;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use anyhow::Result;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum DlStatus { pub enum DlStatus {
@ -82,22 +75,22 @@ impl DlReporter {
#[macro_export] #[macro_export]
macro_rules! report_msg { macro_rules! report_msg {
($rep:ident, $fmt:expr) => { ($rep:ident, $fmt:expr) => {
DlReporter::msg(&$rep, $fmt.to_string()); DlReporter::msg(&$rep, format!($fmt));
}; };
($rep:ident, $fmt:expr, $($fmt2:expr),+) => { ($rep:ident, $fmt:expr, $($fmt2:expr),+) => {
DlReporter::msg(&$rep, format!($fmt, $($fmt2,)+)); DlReporter::msg(&$rep, format!($fmt, $($fmt2,)+));
}; };
} }
struct InfoHolder { pub struct InfoHolder {
filename: String, pub filename: String,
total_size: u64, pub total_size: u64,
progress: u64, pub progress: u64,
speed_mbps: f32, pub speed_mbps: f32,
} }
impl InfoHolder { impl InfoHolder {
fn new(filename: String, total_size: u64) -> InfoHolder { pub fn new(filename: String, total_size: u64) -> InfoHolder {
InfoHolder { InfoHolder {
filename, filename,
total_size, total_size,
@ -107,196 +100,59 @@ impl InfoHolder {
} }
} }
fn print_accumulated_report( pub struct DlReportAccumulator {
statuses: &HashMap<u32, InfoHolder>, parent: DlReporter,
msg_queue: &mut VecDeque<String>, rec: UnboundedReceiver<DlReport>,
moved_lines: u16,
file_count_completed: i32,
file_count_total: i32,
) -> Result<u16> {
let mut dl_speed_sum = 0.0;
execute!(
stdout(),
crossterm::cursor::Hide,
MoveToPreviousLine(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
execute!(
stdout(),
Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
}
execute!(
stdout(),
Print("----------------------------------------".to_string()),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
for v in statuses.values() {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
execute!(
stdout(),
Print(format!(
"Status: {:6.2} mb/s {:5.2}% completed '{}'",
v.speed_mbps, percent_complete, v.filename
)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
dl_speed_sum += v.speed_mbps;
}
let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0;
execute!(
stdout(),
Clear(ClearType::CurrentLine),
Print("\n"),
Print(format!(
" =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%",
dl_speed_sum, file_count_completed, file_count_total, file_percent_completed
)),
Clear(ClearType::UntilNewLine),
Print("\n"),
Clear(ClearType::FromCursorDown),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
} }
pub async fn watch_and_print_reports( impl DlReportAccumulator {
mut receiver: mpsc::UnboundedReceiver<DlReport>, pub fn new(parent: DlReporter) -> (DlReportAccumulator, UnboundedSender<DlReport>) {
file_count_total: i32, let (tx, rec) = mpsc::unbounded_channel();
) -> Result<()> { (DlReportAccumulator { parent, rec }, tx)
let mut statuses: HashMap<u32, InfoHolder> = HashMap::new(); }
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new(); pub async fn accumulate(mut self) {
let mut progresses: HashMap<u32, u64> = HashMap::new();
let mut progress_last: u64 = 0;
let mut t_last = SystemTime::now(); let mut t_last = SystemTime::now();
let mut file_count_completed = 0; let mut average_speed = RollingAverage::new(10);
let mut file_count_failed = 0;
let mut file_count_done = 0;
while let Some(update) = receiver.recv().await { while let Some(update) = self.rec.recv().await {
match update.status { match update.status {
DlStatus::Init { DlStatus::Init {
bytes_total, bytes_total: _,
filename, filename: _,
} => { } => {}
msg_queue.push_back(format!("Starting download for file '{}'", &filename));
statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
}
DlStatus::Update { DlStatus::Update {
speed_mbps, speed_mbps: _,
bytes_curr, bytes_curr,
} => { } => {
// Scope the reference to prevent borrowing conflict later *progresses.entry(update.id).or_insert(0) = bytes_curr;
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 { let progress_curr = progresses.values().sum();
moved_lines = print_accumulated_report( let progress_delta = progress_curr - progress_last;
&statuses, let t_elapsed = t_last.elapsed().unwrap().as_secs_f64();
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
let speed_mbps = average_speed.value() as f32;
// currently executes always, but might change
if progress_delta >= 5_000_000 {
average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed);
progress_last = progress_curr;
t_last = SystemTime::now(); t_last = SystemTime::now();
} }
self.parent.update(speed_mbps, progress_curr);
} }
DlStatus::Done { duration_ms } => { DlStatus::Done { duration_ms: _ } => {}
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id); // Just forwared everything else to the calling receiver
_ => self.parent.send(update.status),
file_count_completed += 1;
file_count_done += 1;
}
DlStatus::DoneErr { filename } => {
msg_queue.push_back(format!("Error: Download failed: '{}'", filename));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
file_count_failed += 1;
file_count_done += 1;
}
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
DlStatus::Skipped => {
file_count_completed += 1;
file_count_done += 1;
} }
} }
} }
print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
execute!(
stdout(),
MoveToPreviousLine(2),
Print(format!(
"All done! {}/{} completed, {} failed\n",
file_count_completed, file_count_total, file_count_failed
)),
Clear(ClearType::FromCursorDown)
)?;
Ok(())
} }

View File

@ -1,66 +1,16 @@
use std::io::SeekFrom;
use std::path::Path;
use std::time::SystemTime;
use anyhow::Result; use anyhow::Result;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use std::io::SeekFrom;
use std::path::Path;
use std::time::SystemTime;
use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use crate::dlreport::*; use crate::dlreport::{DlReportAccumulator, DlReporter};
use crate::errors::*; use crate::errors::DlError;
use crate::misc::RollingAverage;
struct RollingAverage {
index: usize,
data: Vec<f64>,
}
impl RollingAverage {
fn new(size: usize) -> Self {
RollingAverage {
index: 0,
data: Vec::with_capacity(size),
}
}
fn value(&self) -> f64 {
if self.data.is_empty() {
0.0
} else {
let mut max = self.data[0];
for v in self.data.iter() {
if *v > max {
max = *v;
}
}
let mut sum: f64 = self.data.iter().sum();
let mut count = self.data.len();
if self.data.len() >= 3 {
sum -= max;
count -= 1;
}
sum / count as f64
}
}
fn add(&mut self, val: f64) {
if self.data.capacity() == self.data.len() {
self.data[self.index] = val;
self.index += 1;
if self.index >= self.data.capacity() {
self.index = 0;
}
} else {
self.data.push(val);
}
}
}
/// Get the filename at the end of the given URL. This will decode the URL Encoding. /// Get the filename at the end of the given URL. This will decode the URL Encoding.
pub fn url_to_filename(url: &str) -> String { pub fn url_to_filename(url: &str) -> String {
@ -81,7 +31,7 @@ pub async fn download_feedback(
url: &str, url: &str,
into_file: &Path, into_file: &Path,
rep: DlReporter, rep: DlReporter,
content_length: Option<u64>, content_length: u64,
) -> Result<()> { ) -> Result<()> {
download_feedback_chunks(url, into_file, rep, None, content_length).await download_feedback_chunks(url, into_file, rep, None, content_length).await
} }
@ -91,14 +41,9 @@ pub async fn download_feedback_chunks(
into_file: &Path, into_file: &Path,
rep: DlReporter, rep: DlReporter,
from_to: Option<(u64, u64)>, from_to: Option<(u64, u64)>,
content_length: Option<u64>, mut content_length: u64,
) -> Result<()> { ) -> Result<()> {
let mut content_length = match content_length { // Build the HTTP request to download the given link
Some(it) => it,
None => http_get_filesize_and_range_support(url).await?.filesize,
};
// Send the HTTP request to download the given link
let mut req = reqwest::Client::new().get(url); let mut req = reqwest::Client::new().get(url);
// Add range header if needed // Add range header if needed
@ -213,13 +158,8 @@ pub async fn download_feedback_multi(
into_file: &Path, into_file: &Path,
rep: DlReporter, rep: DlReporter,
conn_count: u32, conn_count: u32,
content_length: Option<u64>, content_length: u64,
) -> Result<()> { ) -> Result<()> {
let content_length = match content_length {
Some(it) => it,
None => http_get_filesize_and_range_support(url).await?.filesize,
};
// Create zeroed file with 1 byte too much. This will be truncated on download // Create zeroed file with 1 byte too much. This will be truncated on download
// completion and can indicate that the file is not suitable for continuation // completion and can indicate that the file is not suitable for continuation
create_zeroed_file(into_file, content_length as usize + 1).await?; create_zeroed_file(into_file, content_length as usize + 1).await?;
@ -229,7 +169,7 @@ pub async fn download_feedback_multi(
let mut joiners = Vec::new(); let mut joiners = Vec::new();
let (tx, mut rx) = mpsc::unbounded_channel::<DlReport>(); let (rep_accum, tx) = DlReportAccumulator::new(rep.clone());
let t_start = SystemTime::now(); let t_start = SystemTime::now();
@ -258,7 +198,7 @@ pub async fn download_feedback_multi(
&into_file, &into_file,
rep, rep,
Some(from_to), Some(from_to),
Some(specific_content_length), specific_content_length,
) )
.await .await
})) }))
@ -270,58 +210,7 @@ pub async fn download_feedback_multi(
rep.init(content_length, filename.to_string()); rep.init(content_length, filename.to_string());
let rep_task = rep.clone(); let manager_handle = tokio::task::spawn(rep_accum.accumulate());
let mut t_last = t_start;
let manager_handle = tokio::task::spawn(async move {
let rep = rep_task;
//let mut dl_speeds = vec![0.0_f32; conn_count as usize];
let mut progresses = vec![0; conn_count as usize];
let mut progress_last: u64 = 0;
let mut average_speed = RollingAverage::new(10);
while let Some(update) = rx.recv().await {
match update.status {
DlStatus::Init {
bytes_total: _,
filename: _,
} => {}
DlStatus::Update {
speed_mbps: _,
bytes_curr,
} => {
//dl_speeds[update.id as usize] = speed_mbps;
progresses[update.id as usize] = bytes_curr;
let progress_curr = progresses.iter().sum();
let progress_delta = progress_curr - progress_last;
let t_elapsed = t_last.elapsed().unwrap().as_secs_f64();
let speed_mbps = average_speed.value() as f32;
// currently executes always, but might change
if progress_delta >= 5_000_000 {
average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed);
progress_last = progress_curr;
t_last = SystemTime::now();
}
rep.update(speed_mbps, progress_curr);
}
DlStatus::Done { duration_ms: _ } => {
//dl_speeds[update.id as usize] = 0.0;
}
// Just forwared everything else to the calling receiver
_ => rep.send(update.status),
}
}
});
let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect(); let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect();
// Validate if the tasks were successful. This will always grab the next completed // Validate if the tasks were successful. This will always grab the next completed
@ -377,7 +266,7 @@ pub struct HttpFileInfo {
pub filename: String, pub filename: String,
} }
pub async fn http_get_filesize_and_range_support(url: &str) -> Result<HttpFileInfo> { pub async fn http_file_info(url: &str) -> Result<HttpFileInfo> {
let resp = reqwest::Client::new().head(url).send().await?; let resp = reqwest::Client::new().head(url).send().await?;
let filesize = resp let filesize = resp
@ -402,59 +291,3 @@ pub async fn http_get_filesize_and_range_support(url: &str) -> Result<HttpFileIn
Ok(info) Ok(info)
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rolling_average() {
let mut ra = RollingAverage::new(3);
assert_eq!(0, ra.data.len());
assert_eq!(3, ra.data.capacity());
assert_eq!(0.0, ra.value());
// 10 / 1 = 10
ra.add(10.0);
assert_eq!(1, ra.data.len());
assert_eq!(10.0, ra.value());
// (10 + 20) / 2 = 15
ra.add(20.0);
assert_eq!(2, ra.data.len());
assert_eq!(15.0, ra.value());
// (10 + 20 + 30) / 3 = 20
ra.add(30.0);
assert_eq!(3, ra.data.len());
assert_eq!(20.0, ra.value());
assert_eq!(10.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
// This should replace the oldest value (index 1)
ra.add(40.0);
assert_eq!(3, ra.data.len());
assert_eq!(3, ra.data.capacity());
// (40 + 20 + 30) / 3 = 30
assert_eq!(30.0, ra.value());
assert_eq!(40.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
ra.add(50.0);
ra.add(60.0);
ra.add(70.0);
assert_eq!(70.0, ra.data[0]);
assert_eq!(50.0, ra.data[1]);
assert_eq!(60.0, ra.data[2]);
}
}

View File

@ -1,34 +1,28 @@
use std::{ use std::collections::VecDeque;
collections::VecDeque, use std::path::Path;
path::{Path, PathBuf}, use std::process::exit;
process::exit, use std::sync::Arc;
sync::Arc, use std::time::SystemTime;
time::SystemTime,
};
use clap::Parser;
use futures::future::join_all;
use tokio::{
fs::create_dir_all,
sync::{
mpsc::{unbounded_channel, UnboundedSender},
Mutex,
},
};
use crate::{
args::CLIArgs,
dlreport::{watch_and_print_reports, DlReport, DlReporter},
download::{download_feedback, download_feedback_multi, http_get_filesize_and_range_support},
zippy::is_zippyshare_url,
};
use anyhow::Result; use anyhow::Result;
use clap::Parser;
use futures::future::join_all;
use tokio::fs::create_dir_all;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::Mutex;
use crate::args::CLIArgs;
use crate::clireporter::cli_print_reports;
use crate::dlreport::{DlReport, DlReporter};
use crate::download::{download_feedback, download_feedback_multi, http_file_info};
use crate::zippy::is_zippyshare_url;
mod args; mod args;
mod clireporter;
mod dlreport; mod dlreport;
mod download; mod download;
mod errors; mod errors;
mod misc;
mod zippy; mod zippy;
struct DlRequest { struct DlRequest {
@ -100,12 +94,12 @@ async fn download_multiple(args: CLIArgs, raw_urls: Vec<String>) -> Result<()> {
let t_start = SystemTime::now(); let t_start = SystemTime::now();
let jobs = (0..args.file_count.get()) let jobs = (0..args.file_count.get())
.map(|_| tokio::task::spawn(download_job(urls.clone(), tx.clone(), args.clone()))) .map(|_| tokio::task::spawn(download_job(Arc::clone(&urls), tx.clone(), args.clone())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
drop(tx); drop(tx);
watch_and_print_reports(rx, num_urls as i32).await?; cli_print_reports(rx, num_urls as i32).await?;
join_all(jobs).await; join_all(jobs).await;
@ -135,7 +129,7 @@ async fn download_job(urls: SyncQueue, reporter: UnboundedSender<DlReport>, cli_
dlreq.url.to_string() dlreq.url.to_string()
}; };
let info = match http_get_filesize_and_range_support(&url).await { let info = match http_file_info(&url).await {
Ok(it) => it, Ok(it) => it,
Err(_e) => { Err(_e) => {
report_msg!(reporter, "Error while querying metadata: {url}"); report_msg!(reporter, "Error while querying metadata: {url}");
@ -143,13 +137,7 @@ async fn download_job(urls: SyncQueue, reporter: UnboundedSender<DlReport>, cli_
} }
}; };
let into_file: PathBuf = cli_args let into_file = cli_args.outdir.join(Path::new(&info.filename));
.outdir
.join(Path::new(&info.filename))
.to_str()
.unwrap()
.to_string()
.into();
// If file with same name is present locally, check filesize // If file with same name is present locally, check filesize
if into_file.exists() { if into_file.exists() {
@ -173,20 +161,20 @@ async fn download_job(urls: SyncQueue, reporter: UnboundedSender<DlReport>, cli_
} }
let dl_status = if cli_args.conn_count.get() == 1 { let dl_status = if cli_args.conn_count.get() == 1 {
download_feedback(&url, &into_file, reporter.clone(), Some(info.filesize)).await download_feedback(&url, &into_file, reporter.clone(), info.filesize).await
} else if !info.range_support { } else if !info.range_support {
report_msg!( report_msg!(
reporter, reporter,
"Server does not support range headers. Downloading with single connection: {url}" "Server does not support range headers. Downloading with single connection: {url}"
); );
download_feedback(&url, &into_file, reporter.clone(), Some(info.filesize)).await download_feedback(&url, &into_file, reporter.clone(), info.filesize).await
} else { } else {
download_feedback_multi( download_feedback_multi(
&url, &url,
&into_file, &into_file,
reporter.clone(), reporter.clone(),
cli_args.conn_count.get(), cli_args.conn_count.get(),
Some(info.filesize), info.filesize,
) )
.await .await
}; };

106
src/misc.rs Normal file
View File

@ -0,0 +1,106 @@
pub struct RollingAverage {
index: usize,
data: Vec<f64>,
}
impl RollingAverage {
pub fn new(size: usize) -> Self {
RollingAverage {
index: 0,
data: Vec::with_capacity(size),
}
}
pub fn value(&self) -> f64 {
if self.data.is_empty() {
0.0
} else {
let mut max = self.data[0];
for v in self.data.iter() {
if *v > max {
max = *v;
}
}
let mut sum: f64 = self.data.iter().sum();
let mut count = self.data.len();
if self.data.len() >= 3 {
sum -= max;
count -= 1;
}
sum / count as f64
}
}
pub fn add(&mut self, val: f64) {
if self.data.capacity() == self.data.len() {
self.data[self.index] = val;
self.index += 1;
if self.index >= self.data.capacity() {
self.index = 0;
}
} else {
self.data.push(val);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rolling_average() {
let mut ra = RollingAverage::new(3);
assert_eq!(0, ra.data.len());
assert_eq!(3, ra.data.capacity());
assert_eq!(0.0, ra.value());
// 10 / 1 = 10
ra.add(10.0);
assert_eq!(1, ra.data.len());
assert_eq!(10.0, ra.value());
// (10 + 20) / 2 = 15
ra.add(20.0);
assert_eq!(2, ra.data.len());
assert_eq!(15.0, ra.value());
// (10 + 20 + 30) / 3 = 20
ra.add(30.0);
assert_eq!(3, ra.data.len());
assert_eq!(20.0, ra.value());
assert_eq!(10.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
// This should replace the oldest value (index 1)
ra.add(40.0);
assert_eq!(3, ra.data.len());
assert_eq!(3, ra.data.capacity());
// (40 + 20 + 30) / 3 = 30
assert_eq!(30.0, ra.value());
assert_eq!(40.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
ra.add(50.0);
ra.add(60.0);
ra.add(70.0);
assert_eq!(70.0, ra.data[0]);
assert_eq!(50.0, ra.data[1]);
assert_eq!(60.0, ra.data[2]);
}
}

View File

@ -1,6 +1,7 @@
use std::io::{Error, ErrorKind};
use anyhow::Result; use anyhow::Result;
use regex::Regex; use regex::Regex;
use std::io::{Error, ErrorKind};
pub fn is_zippyshare_url(url: &str) -> bool { pub fn is_zippyshare_url(url: &str) -> bool {
Regex::new(r"^https?://(?:www\d*\.)?zippyshare\.com/v/[0-9a-zA-Z]+/file\.html$") Regex::new(r"^https?://(?:www\d*\.)?zippyshare\.com/v/[0-9a-zA-Z]+/file\.html$")