Refactor & Single conn if range not supported

This commit is contained in:
Daniel M 2022-03-30 01:11:59 +02:00
parent 565ba5984a
commit 2e10b54f32
2 changed files with 97 additions and 102 deletions

View File

@ -1,4 +1,4 @@
use std::num::NonZeroU32; use std::{num::NonZeroU32, path::PathBuf};
use clap::Parser; use clap::Parser;
#[derive(Parser, Clone, Debug)] #[derive(Parser, Clone, Debug)]
@ -16,7 +16,7 @@ pub struct CLIArgs {
default_value = "./", default_value = "./",
help = "Set the output directory. The directory will be created if it doesn't exit yet", help = "Set the output directory. The directory will be created if it doesn't exit yet",
)] )]
pub outdir: String, pub outdir: PathBuf,
#[clap( #[clap(
short = 'i', short = 'i',
@ -24,7 +24,7 @@ pub struct CLIArgs {
value_name = "FILENAME", value_name = "FILENAME",
help = "Force filename. This only works for single file downloads", help = "Force filename. This only works for single file downloads",
)] )]
pub into_file: Option<String>, pub into_file: Option<PathBuf>,
#[clap( #[clap(
short = 'n', short = 'n',
@ -61,7 +61,7 @@ pub struct CLIArgs {
value_name = "URL LISTFILE", value_name = "URL LISTFILE",
help = "Download all files from the specified url list file", help = "Download all files from the specified url list file",
)] )]
pub listfile: Vec<String>, pub listfile: Vec<PathBuf>,
#[clap( #[clap(
short = 'd', short = 'd',

View File

@ -1,18 +1,19 @@
use std::{ use std::{collections::VecDeque, path::Path, process::exit, sync::Arc, time::SystemTime};
collections::VecDeque,
path::{Path, PathBuf},
process::exit,
sync::Arc,
time::SystemTime,
};
use clap::Parser; use clap::Parser;
use download::{download_feedback, download_feedback_multi, http_get_filesize_and_range_support};
use futures::future::join_all; use futures::future::join_all;
use tokio::sync::{mpsc, Mutex}; use tokio::{
fs::create_dir_all,
sync::{
mpsc::{unbounded_channel, UnboundedSender},
Mutex,
},
};
use crate::{ use crate::{
args::CLIArgs, args::CLIArgs,
dlreport::{DlReport, DlReporter, DlStatus}, dlreport::{watch_and_print_reports, DlReport, DlReporter, DlStatus},
errors::ResBE, errors::ResBE,
}; };
@ -22,25 +23,39 @@ mod download;
mod errors; mod errors;
mod zippy; mod zippy;
struct DlRequest {
id: usize,
url: String,
}
type SyncQueue = Arc<Mutex<VecDeque<DlRequest>>>;
#[tokio::main] #[tokio::main]
async fn main() -> ResBE<()> { async fn main() -> ResBE<()> {
let args = CLIArgs::parse(); let args = CLIArgs::parse();
// Combine all urls taken from files and the ones provided on the command line
let mut urls = args.download.clone(); let mut urls = args.download.clone();
for file in args.listfile.iter() { for file in args.listfile.iter() {
match read_urls_from_listfile(file).await { match urls_from_listfile(file).await {
Ok(listfile_urls) => urls.extend(listfile_urls), Ok(listfile_urls) => urls.extend(listfile_urls),
Err(_) => { Err(_) => {
eprintln!("Failed to read urls from file: {}", file); eprintln!("Failed to read urls from file: {}", file.display());
exit(1); exit(1);
} }
} }
} }
if urls.is_empty() {
eprintln!("No URLs provided");
return Ok(());
}
download_multiple(args, urls).await download_multiple(args, urls).await
} }
async fn read_urls_from_listfile(listfile: &str) -> ResBE<Vec<String>> { /// Parse a listfile and return all urls found in it
async fn urls_from_listfile(listfile: &Path) -> ResBE<Vec<String>> {
let text = tokio::fs::read_to_string(listfile).await?; let text = tokio::fs::read_to_string(listfile).await?;
let urls = text let urls = text
.lines() .lines()
@ -51,102 +66,89 @@ async fn read_urls_from_listfile(listfile: &str) -> ResBE<Vec<String>> {
Ok(urls) Ok(urls)
} }
async fn download_multiple(cli_args: CLIArgs, raw_urls: Vec<String>) -> ResBE<()> { // Download all files in parallel according to the provided CLI arguments
let outdir = Path::new(&cli_args.outdir); async fn download_multiple(args: CLIArgs, raw_urls: Vec<String>) -> ResBE<()> {
let num_urls = raw_urls.len(); let num_urls = raw_urls.len();
let parallel_file_count = cli_args.file_count.get(); let urls: SyncQueue = Default::default();
let conn_count = cli_args.conn_count.get();
let zippy = cli_args.zippy;
let urls = Arc::new(Mutex::new(VecDeque::<(usize, String)>::new())); let enumerated_urls = raw_urls
.into_iter()
.enumerate()
.map(|(id, url)| DlRequest { id, url });
urls.lock().await.extend(enumerated_urls);
urls.lock().await.extend(raw_urls.into_iter().enumerate()); if !args.outdir.exists() {
if let Err(_e) = create_dir_all(&args.outdir).await {
if !outdir.exists() { eprintln!(
if let Err(_e) = tokio::fs::create_dir_all(outdir).await { "Error creating output directory '{}'",
eprintln!("Error creating output directory '{}'", outdir.display()); args.outdir.display()
);
exit(1); exit(1);
} }
} }
let (tx, rx) = mpsc::unbounded_channel::<DlReport>(); let (tx, rx) = unbounded_channel::<DlReport>();
let t_start = SystemTime::now(); let t_start = SystemTime::now();
let joiners = (0..parallel_file_count) let jobs = (0..args.file_count.get())
.map(|_| { .map(|_| tokio::task::spawn(download_job(urls.clone(), tx.clone(), args.clone())))
tokio::task::spawn(download_job(
urls.clone(),
tx.clone(),
conn_count,
zippy,
outdir.to_owned(),
cli_args.into_file.clone(),
))
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
drop(tx); drop(tx);
dlreport::watch_and_print_reports(rx, num_urls as i32).await?; watch_and_print_reports(rx, num_urls as i32).await?;
join_all(joiners).await; join_all(jobs).await;
println!("Total time: {}s", t_start.elapsed()?.as_secs()); println!("Total time: {}s", t_start.elapsed()?.as_secs());
Ok(()) Ok(())
} }
async fn download_job( async fn download_job(urls: SyncQueue, reporter: UnboundedSender<DlReport>, cli_args: CLIArgs) {
urls: Arc<Mutex<VecDeque<(usize, String)>>>,
tx: mpsc::UnboundedSender<DlReport>,
conn_count: u32,
zippy: bool,
outdir: PathBuf,
arg_filename: Option<String>,
) {
loop { loop {
let (global_url_index, url) = match urls.lock().await.pop_front() { // Get the next url to download or break if there are no more urls
let dlreq = match urls.lock().await.pop_front() {
Some(it) => it, Some(it) => it,
None => break, None => break,
}; };
let tx = tx.clone(); let reporter = DlReporter::new(dlreq.id as u32, reporter.clone());
let rep = DlReporter::new(global_url_index as u32, tx); // Resolve the zippy url to the direct download url if necessary
let url = if cli_args.zippy {
let url = if zippy { match zippy::resolve_link(&dlreq.url).await {
match zippy::resolve_link(&url).await {
Ok(url) => url, Ok(url) => url,
Err(_e) => { Err(_e) => {
rep.send(DlStatus::Message(format!( reporter.send(DlStatus::Message(format!(
"Zippyshare link could not be resolved: {}", "Zippyshare link could not be resolved: {}",
url dlreq.url
))); )));
continue; continue;
} }
} }
} else { } else {
url.to_string() dlreq.url.to_string()
}; };
let file_name = arg_filename let file_name = cli_args
.into_file
.clone() .clone()
.unwrap_or_else(|| download::url_to_filename(&url)); .unwrap_or_else(|| download::url_to_filename(&url).into());
let into_file = outdir let into_file = cli_args
.outdir
.join(Path::new(&file_name)) .join(Path::new(&file_name))
.to_str() .to_str()
.unwrap() .unwrap()
.to_string(); .to_string();
let path_into_file = Path::new(&into_file); let path_into_file = Path::new(&into_file);
let (filesize, range_supported) = let (filesize, range_supported) = match http_get_filesize_and_range_support(&url).await {
match download::http_get_filesize_and_range_support(&url).await {
Ok((filesize, range_supported)) => (filesize, range_supported), Ok((filesize, range_supported)) => (filesize, range_supported),
Err(_e) => { Err(_e) => {
rep.send(DlStatus::Message(format!( reporter.send(DlStatus::Message(format!(
"Error while querying metadata: {}", "Error while querying metadata: {}",
url url
))); )));
@ -159,50 +161,43 @@ async fn download_job(
let local_filesize = std::fs::metadata(path_into_file).unwrap().len(); let local_filesize = std::fs::metadata(path_into_file).unwrap().len();
if filesize == local_filesize { if filesize == local_filesize {
rep.send(DlStatus::Message(format!( reporter.send(DlStatus::Message(format!(
"Skipping file '{}': already present", "Skipping file '{}': already present",
&file_name file_name.display()
))); )));
rep.send(DlStatus::Skipped); reporter.send(DlStatus::Skipped);
continue; continue;
} else { } else {
rep.send(DlStatus::Message(format!( reporter.send(DlStatus::Message(format!(
"Replacing file '{}': present but not completed", "Replacing file '{}': present but not completed",
&file_name &file_name.display()
))); )));
} }
} }
if conn_count == 1 { let dl_status = if cli_args.conn_count.get() == 1 {
if let Err(_e) = download_feedback(&url, &into_file, reporter.clone(), Some(filesize)).await
download::download_feedback(&url, &into_file, rep.clone(), Some(filesize)).await } else if !range_supported {
{ reporter.send(DlStatus::Message(format!(
rep.send(DlStatus::DoneErr { "Server does not support range headers. Downloading with single connection: {}",
filename: file_name.to_string(),
});
}
} else {
if !range_supported {
rep.send(DlStatus::Message(format!(
"Error Server does not support range header: {}",
url url
))); )));
continue; download_feedback(&url, &into_file, reporter.clone(), Some(filesize)).await
} } else {
download_feedback_multi(
if let Err(_e) = download::download_feedback_multi(
&url, &url,
&into_file, &into_file,
rep.clone(), reporter.clone(),
conn_count, cli_args.conn_count.get(),
Some(filesize), Some(filesize),
) )
.await .await
{ };
rep.send(DlStatus::DoneErr {
filename: file_name.to_string(), if dl_status.is_err() {
reporter.send(DlStatus::DoneErr {
filename: file_name.to_str().unwrap().to_string(),
}); });
} }
};
} }
} }