Slightly improved error recoverability

This commit is contained in:
Daniel M 2021-03-31 01:17:07 +02:00
parent 9edb2daf86
commit 052c3a358a
3 changed files with 102 additions and 58 deletions

View File

@ -25,6 +25,9 @@ pub enum DlStatus {
Done {
duration_ms: u64
},
DoneErr {
filename: String
},
Message(String),
/// Like Message but triggers a display refresh
MessageNow(String)
@ -36,6 +39,7 @@ pub struct DlReport {
pub status: DlStatus
}
#[derive(Clone)]
pub struct DlReporter {
id: i32,
transmitter: mpsc::UnboundedSender<DlReport>
@ -191,6 +195,23 @@ pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver<DlRep
statuses.remove(&update.id);
},
DlStatus::DoneErr {
filename
} => {
msg_queue.push_back(format!(
"Error: Download failed: '{}'", filename
));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines)?;
t_last = SystemTime::now();
},
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
@ -209,7 +230,7 @@ pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver<DlRep
execute!(
stdout(),
MoveToPreviousLine(2),
Print("All done!"),
Print("All done!\n"),
Clear(ClearType::FromCursorDown)
)?;

View File

@ -4,7 +4,8 @@ use std::time::SystemTime;
use percent_encoding::percent_decode_str;
use std::io::SeekFrom;
use tokio::sync::mpsc;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use crate::errors::*;
use crate::dlreport::*;
@ -244,7 +245,7 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter
from_to.1 += rest;
}
download_feedback_chunks(&url, &into_file, rep, Some(from_to), true).await.unwrap();
download_feedback_chunks(&url, &into_file, rep, Some(from_to), true).await.map_err(|e| e.to_string())
}))
}
@ -256,60 +257,85 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter
filename: into_file.to_string()
})?;
let mut update_counter = 0;
let mut dl_speeds = vec![0.0_f32; numparal as usize];
let mut progresses = vec![0; numparal as usize];
let rep_task = rep.clone();
while let Some(update) = rx.recv().await {
match update.status {
joiners.push(tokio::task::spawn(async move {
DlStatus::Init {
bytes_total: _,
filename: _
} => {
let rep = rep_task;
let mut update_counter: i32 = 0;
let mut dl_speeds = vec![0.0_f32; numparal as usize];
let mut progresses = vec![0; numparal as usize];
},
DlStatus::Update {
speed_mbps,
bytes_curr
} => {
while let Some(update) = rx.recv().await {
match update.status {
DlStatus::Init {
bytes_total: _,
filename: _
} => {
},
DlStatus::Update {
speed_mbps,
bytes_curr
} => {
dl_speeds[update.id as usize] = speed_mbps;
progresses[update.id as usize] = bytes_curr;
if update_counter >= 0 {
update_counter = 0;
let speed = dl_speeds.iter().sum();
let curr = progresses.iter().sum();
rep.send(DlStatus::Update {
speed_mbps: speed,
bytes_curr: curr
}).unwrap();
} else {
update_counter += 1;
}
},
DlStatus::Done {
duration_ms: _
} => {
dl_speeds[update.id as usize] = 0.0;
},
// Just forwared everything else to the calling receiver
_ => rep.send(update.status).unwrap()
}
}
dl_speeds[update.id as usize] = speed_mbps;
progresses[update.id as usize] = bytes_curr;
Ok(())
}));
if update_counter >= 0 {
update_counter = 0;
let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect();
// Validate if the tasks were successful. This will always grab the next completed
// task, independent from the original order in the joiners list
while let Some(output) = joiners.next().await {
let speed = dl_speeds.iter().sum();
let curr = progresses.iter().sum();
// If any of the download tasks fail, abort the rest and delete the file
// since it is non-recoverable anyways
if let Err(e) = output? {
rep.send(DlStatus::Update {
speed_mbps: speed,
bytes_curr: curr
})?;
for handle in joiners.iter() {
handle.abort();
}
} else {
update_counter += 1;
}
},
DlStatus::Done {
duration_ms: _
} => {
dl_speeds[update.id as usize] = 0.0;
},
// Just forwared everything else to the calling receiver
_ => rep.send(update.status)?
tokio::fs::remove_file(&into_file).await?;
return Err(e.into());
}
}
join_all(joiners).await;
// Remove the additional byte at the file end
let ofile = tokio::fs::OpenOptions::new()
.create(false)
@ -319,9 +345,7 @@ pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter
.await?;
ofile.set_len(content_length).await?;
rep.send(DlStatus::Done {
duration_ms: t_start.elapsed()?.as_millis() as u64
})?;

View File

@ -212,13 +212,10 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32, boost
let url = if is_zippy {
match zippy::resolve_link(&url).await {
Ok(url) => url,
Err(e) => {
Err(_e) => {
rep.send(
DlStatus::MessageNow(format!("Zippyshare link could not be resolved: {}", url))
).unwrap();
rep.send(
DlStatus::MessageNow(format!("{}", e))
).unwrap();
continue;
}
@ -246,16 +243,18 @@ async fn download_multiple(urls: Vec<String>, outdir: &str, numparal: i32, boost
}
if boost == 1 {
if let Err(e) = download::download_feedback(&url, &into_file, rep).await {
eprintln!("Error while downloading '{}'", file_name);
eprintln!("{}", e);
if let Err(_e) = download::download_feedback(&url, &into_file, rep.clone()).await {
rep.send(DlStatus::DoneErr {
filename: into_file.to_string()
}).unwrap();
}
} else {
if let Err(e) = download::download_feedback_multi(&url, &into_file, rep, boost).await {
eprintln!("Error while downloading '{}'", file_name);
eprintln!("{}", e);
if let Err(_e) = download::download_feedback_multi(&url, &into_file, rep.clone(), boost).await {
rep.send(DlStatus::DoneErr {
filename: into_file.to_string()
}).unwrap();
}
}
};
}
}))