ffdl/src/dlreport.rs
2022-03-31 17:13:57 +02:00

263 lines
7.5 KiB
Rust

use std::collections::{HashMap, VecDeque};
use std::io::stdout;
use std::time::SystemTime;
use tokio::sync::mpsc;
use crossterm::cursor::MoveToPreviousLine;
use crossterm::execute;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use anyhow::Result;
#[derive(Clone, Debug)]
pub enum DlStatus {
Init { bytes_total: u64, filename: String },
Update { speed_mbps: f32, bytes_curr: u64 },
Done { duration_ms: u64 },
DoneErr { filename: String },
Skipped,
Message(String),
}
#[derive(Clone, Debug)]
pub struct DlReport {
pub id: u32,
pub status: DlStatus,
}
#[derive(Clone)]
pub struct DlReporter {
id: u32,
transmitter: mpsc::UnboundedSender<DlReport>,
}
impl DlReporter {
pub fn new(id: u32, transmitter: mpsc::UnboundedSender<DlReport>) -> DlReporter {
DlReporter { id, transmitter }
}
pub fn send(&self, status: DlStatus) {
// This should not fail, so unwrap it here instead propagating the error
self.transmitter
.send(DlReport {
id: self.id,
status,
})
.unwrap();
}
}
struct InfoHolder {
filename: String,
total_size: u64,
progress: u64,
speed_mbps: f32,
}
impl InfoHolder {
fn new(filename: String, total_size: u64) -> InfoHolder {
InfoHolder {
filename,
total_size,
progress: 0,
speed_mbps: 0.0,
}
}
}
fn print_accumulated_report(
statuses: &HashMap<u32, InfoHolder>,
msg_queue: &mut VecDeque<String>,
moved_lines: u16,
file_count_completed: i32,
file_count_total: i32,
) -> Result<u16> {
let mut dl_speed_sum = 0.0;
execute!(
stdout(),
crossterm::cursor::Hide,
MoveToPreviousLine(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
execute!(
stdout(),
Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
}
execute!(
stdout(),
Print("----------------------------------------".to_string()),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
for v in statuses.values() {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
execute!(
stdout(),
Print(format!(
"Status: {:6.2} mb/s {:5.2}% completed '{}'",
v.speed_mbps, percent_complete, v.filename
)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
dl_speed_sum += v.speed_mbps;
}
let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0;
execute!(
stdout(),
Clear(ClearType::CurrentLine),
Print("\n"),
Print(format!(
" =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%",
dl_speed_sum, file_count_completed, file_count_total, file_percent_completed
)),
Clear(ClearType::UntilNewLine),
Print("\n"),
Clear(ClearType::FromCursorDown),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
}
pub async fn watch_and_print_reports(
mut receiver: mpsc::UnboundedReceiver<DlReport>,
file_count_total: i32,
) -> Result<()> {
let mut statuses: HashMap<u32, InfoHolder> = HashMap::new();
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new();
let mut t_last = SystemTime::now();
let mut file_count_completed = 0;
let mut file_count_failed = 0;
let mut file_count_done = 0;
while let Some(update) = receiver.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename,
} => {
msg_queue.push_back(format!("Starting download for file '{}'", &filename));
statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
}
DlStatus::Update {
speed_mbps,
bytes_curr,
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
}
DlStatus::Done { duration_ms } => {
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id);
file_count_completed += 1;
file_count_done += 1;
}
DlStatus::DoneErr { filename } => {
msg_queue.push_back(format!("Error: Download failed: '{}'", filename));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
file_count_failed += 1;
file_count_done += 1;
}
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
DlStatus::Skipped => {
file_count_completed += 1;
file_count_done += 1;
}
}
}
print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
execute!(
stdout(),
MoveToPreviousLine(2),
Print(format!(
"All done! {}/{} completed, {} failed\n",
file_count_completed, file_count_total, file_count_failed
)),
Clear(ClearType::FromCursorDown)
)?;
Ok(())
}