ffdl/src/dlreport.rs

159 lines
4.3 KiB
Rust

use std::{collections::HashMap, time::SystemTime};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::misc::RollingAverage;
#[derive(Clone, Debug)]
pub enum DlStatus {
Init { bytes_total: u64, filename: String },
Update { speed_mbps: f32, bytes_curr: u64 },
Done { duration_ms: u64 },
DoneErr { filename: String },
Skipped,
Message(String),
}
#[derive(Clone, Debug)]
pub struct DlReport {
pub id: u32,
pub status: DlStatus,
}
#[derive(Clone)]
pub struct DlReporter {
id: u32,
transmitter: mpsc::UnboundedSender<DlReport>,
}
impl DlReporter {
pub fn new(id: u32, transmitter: mpsc::UnboundedSender<DlReport>) -> DlReporter {
DlReporter { id, transmitter }
}
pub fn send(&self, status: DlStatus) {
// This should not fail, so unwrap it here instead propagating the error
self.transmitter
.send(DlReport {
id: self.id,
status,
})
.unwrap();
}
pub fn init(&self, bytes_total: u64, filename: String) {
self.send(DlStatus::Init {
bytes_total,
filename,
})
}
pub fn update(&self, speed_mbps: f32, bytes_curr: u64) {
self.send(DlStatus::Update {
speed_mbps,
bytes_curr,
})
}
pub fn done(&self, duration_ms: u64) {
self.send(DlStatus::Done { duration_ms })
}
pub fn done_err(&self, filename: String) {
self.send(DlStatus::DoneErr { filename })
}
pub fn skipped(&self) {
self.send(DlStatus::Skipped);
}
pub fn msg(&self, msg: String) {
self.send(DlStatus::Message(msg));
}
}
#[macro_export]
macro_rules! report_msg {
($rep:ident, $fmt:expr) => {
DlReporter::msg(&$rep, format!($fmt));
};
($rep:ident, $fmt:expr, $($fmt2:expr),+) => {
DlReporter::msg(&$rep, format!($fmt, $($fmt2,)+));
};
}
pub struct InfoHolder {
pub filename: String,
pub total_size: u64,
pub progress: u64,
pub speed_mbps: f32,
}
impl InfoHolder {
pub fn new(filename: String, total_size: u64) -> InfoHolder {
InfoHolder {
filename,
total_size,
progress: 0,
speed_mbps: 0.0,
}
}
}
pub struct DlReportAccumulator {
parent: DlReporter,
rec: UnboundedReceiver<DlReport>,
}
impl DlReportAccumulator {
pub fn new(parent: DlReporter) -> (DlReportAccumulator, UnboundedSender<DlReport>) {
let (tx, rec) = mpsc::unbounded_channel();
(DlReportAccumulator { parent, rec }, tx)
}
pub async fn accumulate(mut self) {
let mut progresses: HashMap<u32, u64> = HashMap::new();
let mut progress_last: u64 = 0;
let mut t_last = SystemTime::now();
let mut average_speed = RollingAverage::new(10);
while let Some(update) = self.rec.recv().await {
match update.status {
DlStatus::Init {
bytes_total: _,
filename: _,
} => {}
DlStatus::Update {
speed_mbps: _,
bytes_curr,
} => {
*progresses.entry(update.id).or_insert(0) = bytes_curr;
let progress_curr = progresses.values().sum();
let progress_delta = progress_curr - progress_last;
let t_elapsed = t_last.elapsed().unwrap().as_secs_f64();
let speed_mbps = average_speed.value() as f32;
// currently executes always, but might change
if progress_delta >= 5_000_000 {
average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed);
progress_last = progress_curr;
t_last = SystemTime::now();
}
self.parent.update(speed_mbps, progress_curr);
}
DlStatus::Done { duration_ms: _ } => {}
// Just forwared everything else to the calling receiver
_ => self.parent.send(update.status),
}
}
}
}