diff --git a/src/dlreport.rs b/src/dlreport.rs index c6205d4..9c321bc 100644 --- a/src/dlreport.rs +++ b/src/dlreport.rs @@ -25,6 +25,9 @@ pub enum DlStatus { Done { duration_ms: u64 }, + DoneErr { + filename: String + }, Message(String), /// Like Message but triggers a display refresh MessageNow(String) @@ -36,6 +39,7 @@ pub struct DlReport { pub status: DlStatus } +#[derive(Clone)] pub struct DlReporter { id: i32, transmitter: mpsc::UnboundedSender @@ -191,6 +195,23 @@ pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver { + + msg_queue.push_back(format!( + "Error: Download failed: '{}'", filename + )); + + // Don't care if it exists, just make sure it is gone + statuses.remove(&update.id); + + + // Refresh display + moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?; + t_last = SystemTime::now(); + }, DlStatus::Message(msg) => { msg_queue.push_back(msg); @@ -209,7 +230,7 @@ pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver { + let rep = rep_task; + let mut update_counter: i32 = 0; + let mut dl_speeds = vec![0.0_f32; numparal as usize]; + let mut progresses = vec![0; numparal as usize]; - }, - DlStatus::Update { - speed_mbps, - bytes_curr - } => { + while let Some(update) = rx.recv().await { + match update.status { + + DlStatus::Init { + bytes_total: _, + filename: _ + } => { + + }, + DlStatus::Update { + speed_mbps, + bytes_curr + } => { + + dl_speeds[update.id as usize] = speed_mbps; + progresses[update.id as usize] = bytes_curr; + + if update_counter >= 0 { + update_counter = 0; + + let speed = dl_speeds.iter().sum(); + let curr = progresses.iter().sum(); + + rep.send(DlStatus::Update { + speed_mbps: speed, + bytes_curr: curr + }).unwrap(); + + } else { + update_counter += 1; + } + + }, + DlStatus::Done { + duration_ms: _ + } => { + + dl_speeds[update.id as usize] = 0.0; + + }, + + // Just forwared everything else to the calling receiver + _ => rep.send(update.status).unwrap() + + } + } - dl_speeds[update.id as usize] = speed_mbps; - progresses[update.id as usize] = bytes_curr; + Ok(()) + })); + - if update_counter >= 0 { - update_counter = 0; + let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect(); + // Validate if the tasks were successful. This will always grab the next completed + // task, independent from the original order in the joiners list + while let Some(output) = joiners.next().await { - let speed = dl_speeds.iter().sum(); - let curr = progresses.iter().sum(); + // If any of the download tasks fail, abort the rest and delete the file + // since it is non-recoverable anyways + if let Err(e) = output? { - rep.send(DlStatus::Update { - speed_mbps: speed, - bytes_curr: curr - })?; + for handle in joiners.iter() { + handle.abort(); + } - } else { - update_counter += 1; - } - - }, - DlStatus::Done { - duration_ms: _ - } => { - - dl_speeds[update.id as usize] = 0.0; - - }, - - // Just forwared everything else to the calling receiver - _ => rep.send(update.status)? + tokio::fs::remove_file(&into_file).await?; + return Err(e.into()); } } - join_all(joiners).await; - - // Remove the additional byte at the file end let ofile = tokio::fs::OpenOptions::new() .create(false) @@ -319,9 +345,7 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter .await?; ofile.set_len(content_length).await?; - - rep.send(DlStatus::Done { duration_ms: t_start.elapsed()?.as_millis() as u64 })?; diff --git a/src/main.rs b/src/main.rs index 6b9edca..21ac233 100644 --- a/src/main.rs +++ b/src/main.rs @@ -212,13 +212,10 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32, boost let url = if is_zippy { match zippy::resolve_link(&url).await { Ok(url) => url, - Err(e) => { + Err(_e) => { rep.send( DlStatus::MessageNow(format!("Zippyshare link could not be resolved: {}", url)) ).unwrap(); - rep.send( - DlStatus::MessageNow(format!("{}", e)) - ).unwrap(); continue; } @@ -246,16 +243,18 @@ async fn download_multiple(urls: Vec, outdir: &str, numparal: i32, boost } if boost == 1 { - if let Err(e) = download::download_feedback(&url, &into_file, rep).await { - eprintln!("Error while downloading '{}'", file_name); - eprintln!("{}", e); + if let Err(_e) = download::download_feedback(&url, &into_file, rep.clone()).await { + rep.send(DlStatus::DoneErr { + filename: into_file.to_string() + }).unwrap(); } } else { - if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, boost).await { - eprintln!("Error while downloading '{}'", file_name); - eprintln!("{}", e); + if let Err(_e) = download::download_feedback_multi(&url, &into_file, rep.clone(), boost).await { + rep.send(DlStatus::DoneErr { + filename: into_file.to_string() + }).unwrap(); } - } + }; } }))