Improve terminal display output

- Logs are printed at the top
- Status updates are printed at the bottom and are updated in-place
- Removed the redundant code for `download_one`
This commit is contained in:
Daniel M 2021-03-26 01:17:12 +01:00
parent 9ca93cbeb2
commit d12c174a8b
5 changed files with 279 additions and 215 deletions

44
Cargo.lock generated
View File

@ -71,6 +71,19 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"winapi",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.33.3" version = "2.33.3"
@ -140,7 +153,7 @@ dependencies = [
name = "fdl" name = "fdl"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "chrono",
"clap", "clap",
"crossterm", "crossterm",
"futures", "futures",
@ -537,6 +550,25 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.0" version = "1.13.0"
@ -967,6 +999,16 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "tinyvec" name = "tinyvec"
version = "1.1.1" version = "1.1.1"

View File

@ -10,7 +10,6 @@ reqwest = { version = "0.11.2", features = [ "stream" ] }
futures = "0.3.12" futures = "0.3.12"
percent-encoding = "2.1.0" percent-encoding = "2.1.0"
regex = "1.4.3" regex = "1.4.3"
bytes = "1.0.1"
crossterm = "0.19.0" crossterm = "0.19.0"
clap = "2.33.3" clap = "2.33.3"
#futures-util = "0.3.13" chrono = "0.4"

View File

@ -1,5 +1,13 @@
use std::collections::{ HashMap, VecDeque };
use std::time::SystemTime;
use std::io::stdout;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crossterm::cursor::{ MoveUp };
use crossterm::execute;
use crossterm::terminal::{ Clear, ClearType };
use crate::errors::*; use crate::errors::*;
@ -10,12 +18,13 @@ pub enum DlStatus {
filename: String filename: String
}, },
Update { Update {
speed_mbps: f64, speed_mbps: f32,
bytes_curr: u64 bytes_curr: u64
}, },
Done { Done {
duration_ms: u64 duration_ms: u64
} },
Message(String)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -46,3 +55,161 @@ impl DlReporter {
).map_err(|e| e.into()) ).map_err(|e| e.into())
} }
} }
struct InfoHolder {
filename: String,
total_size: u64,
progress: u64,
speed_mbps: f32
}
impl InfoHolder {
fn new(filename: String, total_size: u64) -> InfoHolder {
InfoHolder {
filename,
total_size,
progress: 0,
speed_mbps: 0.0
}
}
}
fn print_accumulated_report(statuses: & HashMap<i32, InfoHolder>, msg_queue: &mut VecDeque<String>, moved_lines: u16) -> ResBE<u16> {
let mut dl_speed_sum = 0.0;
execute!(
stdout(),
crossterm::cursor::Hide,
MoveUp(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
print!("{} > {}", ct_now.format("%H:%M:%S"), msg);
execute!(
stdout(),
Clear(ClearType::UntilNewLine)
)?;
println!();
}
print!("----------------------------------------");
execute!(
stdout(),
Clear(ClearType::UntilNewLine)
)?;
println!();
for (_k, v) in statuses {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
print!("Status: {:6.2} mb/s {:5.2}% completed '{}'", v.speed_mbps, percent_complete, v.filename);
execute!(
stdout(),
Clear(ClearType::UntilNewLine)
)?;
println!("");
dl_speed_sum += v.speed_mbps;
}
println!();
if statuses.len() != 0 {
print!(" =>> Accumulated download speed: {:6.2} mb/s", dl_speed_sum);
}
execute!(
stdout(),
Clear(ClearType::UntilNewLine)
)?;
println!("");
execute!(
stdout(),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
}
pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver<DlReport>) -> ResBE<()> {
let mut statuses: HashMap<i32, InfoHolder> = HashMap::new();
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new();
let mut t_last = SystemTime::now();
while let Some(update) = receiver.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename
} => {
msg_queue.push_back(format!("Starting download for file '{}'", &filename));
statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
},
DlStatus::Update {
speed_mbps,
bytes_curr
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?;
t_last = SystemTime::now();
}
},
DlStatus::Done {
duration_ms
} => {
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id);
},
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
}
}
}
print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?;
execute!(
stdout(),
MoveUp(2)
)?;
println!("All done!");
Ok(())
}

View File

@ -12,7 +12,7 @@ use crate::dlreport::*;
struct RollingAverage { struct RollingAverage {
index: usize, index: usize,
data: Vec<f64> data: Vec<f32>
} }
impl RollingAverage { impl RollingAverage {
@ -24,16 +24,16 @@ impl RollingAverage {
} }
} }
fn value(&self) -> f64 { fn value(&self) -> f32 {
if self.data.len() == 0 { if self.data.len() == 0 {
0.0 0.0
} else { } else {
let sum: f64 = self.data.iter().sum(); let sum: f32 = self.data.iter().sum();
sum / self.data.len() as f64 sum / self.data.len() as f32
} }
} }
fn add(&mut self, val: f64) { fn add(&mut self, val: f32) {
if self.data.capacity() == self.data.len() { if self.data.capacity() == self.data.len() {
self.data[self.index] = val; self.data[self.index] = val;
@ -159,7 +159,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte
let mut t_last_speed = SystemTime::now(); let mut t_last_speed = SystemTime::now();
let mut last_bytecount = 0; let mut last_bytecount = 0;
let mut average_speed = RollingAverage::new(5); let mut average_speed = RollingAverage::new(10);
// Read data from server as long as new data is available // Read data from server as long as new data is available
while let Some(chunk) = resp.chunk().await? { while let Some(chunk) = resp.chunk().await? {
@ -181,7 +181,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte
// Update rolling average // Update rolling average
average_speed.add( average_speed.add(
(last_bytecount as f64) / (1000.0 * t_elapsed as f64) (last_bytecount as f32) / (1_000.0 * t_elapsed as f32)
); );
speed_mbps = average_speed.value(); speed_mbps = average_speed.value();
@ -201,7 +201,7 @@ pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporte
} }
// Ensure that IO is completed // Ensure that IO is completed
ofile.flush().await?; //ofile.flush().await?;
let duration_ms = t_start.elapsed()?.as_millis() as u64; let duration_ms = t_start.elapsed()?.as_millis() as u64;
@ -267,7 +267,7 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter
})?; })?;
let mut update_counter = 0; let mut update_counter = 0;
let mut dl_speeds = vec![0.0f64; numparal as usize]; let mut dl_speeds = vec![0.0_f32; numparal as usize];
let mut progresses = vec![0; numparal as usize]; let mut progresses = vec![0; numparal as usize];
while let Some(update) = rx.recv().await { while let Some(update) = rx.recv().await {
@ -309,7 +309,10 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter
dl_speeds[update.id as usize] = 0.0; dl_speeds[update.id as usize] = 0.0;
} },
// Just forwared everything else to the calling receiver
_ => rep.send(update.status)?
} }
} }

View File

@ -3,9 +3,7 @@ use std::process::exit;
use clap::{ App, Arg, ArgGroup, crate_version }; use clap::{ App, Arg, ArgGroup, crate_version };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use futures::future::join_all; use futures::future::join_all;
use std::time::SystemTime;
use std::io::BufRead; use std::io::BufRead;
use std::collections::HashMap;
use dlreport::{ DlReport, DlStatus, DlReporter }; use dlreport::{ DlReport, DlStatus, DlReporter };
use errors::ResBE; use errors::ResBE;
@ -38,6 +36,16 @@ async fn main() -> ResBE<()> {
.takes_value(true) .takes_value(true)
.help("Specify the number concurrent downloads") .help("Specify the number concurrent downloads")
) )
.arg(
Arg::with_name("boost")
.short("b")
.long("boost")
.value_name("CONNECTIONS PER FILE")
.takes_value(true)
.help("Specify the number connections per single downloads. \
Files started with boost can't be continued. \
NOTE: This will likely cause IO bottlenecks on HDDs")
)
.arg( .arg(
Arg::with_name("zippyshare") Arg::with_name("zippyshare")
.short("z") .short("z")
@ -99,8 +107,21 @@ async fn main() -> ResBE<()> {
} }
}; };
let is_zippy = arguments.is_present("zippyshare");
let boost = match arguments.value_of("boost") {
Some(it) => it,
None => "1"
};
let boost: i32 = match boost.parse() {
Ok(it) => it,
Err(_) => {
eprintln!("Invalid value for boost: {}", numparal);
exit(1);
}
};
let is_zippy = arguments.is_present("zippyshare");
if arguments.is_present("listfile") { if arguments.is_present("listfile") {
@ -133,7 +154,7 @@ async fn main() -> ResBE<()> {
urls = zippy_urls; urls = zippy_urls;
} }
download_multiple(urls, outdir, numparal).await?; download_multiple(urls, outdir, numparal, boost).await?;
} else if arguments.is_present("download") { } else if arguments.is_present("download") {
@ -151,7 +172,13 @@ async fn main() -> ResBE<()> {
url.to_string() url.to_string()
}; };
download_one(&url, outdir, numparal).await?; let numparal = if boost != 1 {
boost
} else {
numparal
};
download_multiple(vec![url], outdir, 1, numparal).await?;
} else if arguments.is_present("resolve") { } else if arguments.is_present("resolve") {
@ -176,107 +203,7 @@ async fn main() -> ResBE<()> {
Ok(()) Ok(())
} }
async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32, boost: i32) -> ResBE<()> {
async fn download_one(url: &str, outdir: &str, numparal: i32) -> ResBE<()> {
let outdir = Path::new(outdir);
if !outdir.exists() {
std::fs::create_dir_all(outdir)?;
}
let file_name = download::url_to_filename(url);
let into_file = outdir.join(Path::new(&file_name));
let into_file = into_file.to_str().unwrap().to_string();
let path_into_file = Path::new(&into_file);
// If file with same name is present locally, check filesize
if path_into_file.exists() {
let (filesize, _) = download::http_get_filesize_and_range_support(&url).await?;
let local_filesize = std::fs::metadata(path_into_file)?.len();
if filesize == local_filesize {
println!("Skipping file '{}': already present", &file_name);
return Ok(());
} else {
println!("Replacing file '{}': present but not completed", &file_name);
}
}
// Create com channel to get feedback on download progress
let (tx, mut rx) = mpsc::unbounded_channel::<DlReport>();
// Start download nonblocking
let url = url.to_string();
let jh_download = tokio::spawn(async move {
// Create reporter with id 0 since there is only one anyways
let rep = DlReporter::new(0, tx);
if numparal == 1 {
if let Err(e) = download::download_feedback(&url, &into_file, rep).await {
eprintln!("Error while downloading");
eprintln!("{}", e);
}
} else {
if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, numparal).await {
eprintln!("Error while downloading");
eprintln!("{}", e);
}
}
});
let mut t_last = SystemTime::UNIX_EPOCH;
let mut filesize = 0;
// Handle download status updates until all transmitters are closed
// this happens when the download is completed
while let Some(update) = rx.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename
} => {
println!("Starting download for file '{}'", &filename);
filesize = bytes_total;
},
DlStatus::Update {
speed_mbps,
bytes_curr
} => {
// Print update every second, otherwise ignore the updates
if t_last.elapsed()?.as_millis() > 1000 {
let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0;
println!("Status: {:6.2} mb/s {:5.2}% completed", speed_mbps, percent_complete);
t_last = SystemTime::now();
}
},
DlStatus::Done {
duration_ms
} => {
println!("Status: 100% completed");
println!("Download took {} seconds", (duration_ms / 1000));
}
}
}
// Await the download just to make sure
jh_download.await?;
Ok(())
}
async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32) -> ResBE<()> {
let outdir = Path::new(outdir); let outdir = Path::new(outdir);
if !outdir.exists() { if !outdir.exists() {
@ -285,7 +212,7 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32) -> Re
let mut joiners = Vec::new(); let mut joiners = Vec::new();
let (tx, mut rx) = mpsc::unbounded_channel::<DlReport>(); let (tx, rx) = mpsc::unbounded_channel::<DlReport>();
for offset in 0..numparal { for offset in 0..numparal {
@ -303,12 +230,16 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32) -> Re
for (i, url) in urls.iter().enumerate() { for (i, url) in urls.iter().enumerate() {
let tx = tx.clone();
// Recalculated index in the main url vector, used as id // Recalculated index in the main url vector, used as id
let global_url_index = i as i32 * numparal + offset; let global_url_index = i as i32 * numparal + offset;
let rep = DlReporter::new(global_url_index, tx);
let file_name = download::url_to_filename(&url); let file_name = download::url_to_filename(&url);
let into_file = outdir.join(Path::new(&file_name)); let into_file = outdir.join(Path::new(&file_name))
let into_file = into_file.to_str().unwrap().to_string(); .to_str().unwrap().to_string();
let path_into_file = Path::new(&into_file); let path_into_file = Path::new(&into_file);
// If file with same name is present locally, check filesize // If file with same name is present locally, check filesize
@ -317,19 +248,24 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32) -> Re
let local_filesize = std::fs::metadata(path_into_file).unwrap().len(); let local_filesize = std::fs::metadata(path_into_file).unwrap().len();
if filesize == local_filesize { if filesize == local_filesize {
println!("Skipping file '{}': already present", &file_name); rep.send(DlStatus::Message(format!("Skipping file '{}': already present", &file_name))).unwrap();
continue; continue;
} else { } else {
println!("Replacing file '{}': present but not completed", &file_name); rep.send(DlStatus::Message(format!("Replacing file '{}': present but not completed", &file_name))).unwrap();
} }
} }
let rep = DlReporter::new(global_url_index, tx.clone()); if boost == 1 {
if let Err(e) = download::download_feedback(&url, &into_file, rep).await { if let Err(e) = download::download_feedback(&url, &into_file, rep).await {
eprintln!("Error while downloading '{}'", file_name); eprintln!("Error while downloading '{}'", file_name);
eprintln!("{}", e); eprintln!("{}", e);
} }
} else {
if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, boost).await {
eprintln!("Error while downloading '{}'", file_name);
eprintln!("{}", e);
}
}
} }
})) }))
@ -338,90 +274,7 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32) -> Re
drop(tx); drop(tx);
// filename, total size bytes, current size bytes, download speed mbps dlreport::watch_and_print_reports(rx).await?;
let mut statuses: HashMap<i32, (String, u64, u64, f64)> = HashMap::new();
let mut t_last = SystemTime::now();
while let Some(update) = rx.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename
} => {
println!("Starting download for file '{}'", &filename);
statuses.insert(update.id, (filename, bytes_total, 0, 0.0));
},
DlStatus::Update {
speed_mbps,
bytes_curr
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.2 = bytes_curr;
s.3 = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
let mut dl_speed_sum = 0.0;
for (_k, v) in &statuses {
let filename = &v.0;
let filesize = v.1;
let bytes_curr = v.2;
let speed_mbps = v.3;
let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0;
crossterm::execute!(
std::io::stdout(),
crossterm::terminal::Clear(crossterm::terminal::ClearType::CurrentLine)
);
println!("Status: {:6.2} mb/s {:5.2}% completed '{}'", speed_mbps, percent_complete, filename);
dl_speed_sum += speed_mbps;
}
crossterm::execute!(
std::io::stdout(),
crossterm::terminal::Clear(crossterm::terminal::ClearType::CurrentLine)
);
println!("Accumulated download speed: {:6.2} mb/s\n", dl_speed_sum);
crossterm::execute!(
std::io::stdout(),
crossterm::cursor::MoveUp(statuses.len() as u16 + 2)
);
t_last = SystemTime::now();
}
},
DlStatus::Done {
duration_ms
} => {
println!(
"Status: 100% completed '{}'\nDownload took {} seconds",
&statuses.get(&update.id).unwrap().0,
(duration_ms / 1000)
);
statuses.remove(&update.id);
}
}
}
join_all(joiners).await; join_all(joiners).await;