From 33f559ad4f0ae41f646341fc37a673929bc21f2c Mon Sep 17 00:00:00 2001 From: Daniel M Date: Fri, 11 Jun 2021 19:23:22 +0200 Subject: [PATCH] Better split URLs between worker tasks - URLs are now extracted from a synchronized queue by each task - Bump version to 0.1.2 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 37 +++++++++++++++++++++---------------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e27e41..154c088 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,7 +151,7 @@ dependencies = [ [[package]] name = "ffdl" -version = "0.1.1" +version = "0.1.2" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 0ee7781..81d4f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffdl" -version = "0.1.1" +version = "0.1.2" authors = ["daniel m "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index 796f26f..2bd9477 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ +use std::collections::VecDeque; use std::path::Path; use std::process::exit; +use std::sync::Arc; use clap::{ App, Arg, ArgGroup, crate_version }; +use std::sync::Mutex; use tokio::sync::mpsc; use futures::future::join_all; use std::io::BufRead; @@ -27,7 +30,7 @@ enum CLIAction { struct CLIArguments { outdir: String, into_file: Option, - file_count: u32, + parallel_file_count: u32, conn_count: u32, zippy: bool, action: CLIAction, @@ -172,7 +175,7 @@ async fn main() -> ResBE<()> { let mut cli_args = CLIArguments { outdir: outdir, into_file: into_file, - file_count: file_count, + parallel_file_count: file_count, conn_count: conn_count, zippy: is_zippy, action: action, @@ -231,12 +234,19 @@ async fn download_multiple(cli_args: CLIArguments) -> ResBE<()> { let outdir = cli_args.outdir; let outdir = Path::new(&outdir); - let file_count = cli_args.file_count; + let parallel_file_count = cli_args.parallel_file_count; let zippy = cli_args.zippy; let conn_count = cli_args.conn_count; + let urls = Arc::new(Mutex::new(VecDeque::<(u32, String)>::new())); + + cli_args.urls.iter().enumerate().for_each(|(i, url)| { + urls.lock().unwrap().push_back((i as u32, url.clone())); + }); + + if !outdir.exists() { if let Err(_e) = std::fs::create_dir_all(outdir) { eprintln!("Error creating output directory '{}'", outdir.to_str().unwrap()); @@ -250,29 +260,24 @@ async fn download_multiple(cli_args: CLIArguments) -> ResBE<()> { let (tx, rx) = mpsc::unbounded_channel::(); - for offset in 0 .. file_count { - - let urls: Vec = cli_args.urls - .iter() - .enumerate() - .filter(|(index, _)| (index) % file_count as usize == offset as usize) - .map(|(_, v)| v.to_owned()) - .collect(); + for _offset in 0 .. parallel_file_count { let tx = tx.clone(); let outdir = outdir.to_owned(); - let offset = offset; let arg_filename = cli_args.into_file.clone(); + let urls = urls.clone(); + joiners.push(tokio::task::spawn(async move { - for (i, url) in urls.iter().enumerate() { + loop { + let (global_url_index, url) = match urls.lock().unwrap().pop_front() { + Some(it) => it, + None => break + }; let tx = tx.clone(); - // Recalculated index in the main url vector, used as id - let global_url_index = i as u32 * file_count + offset; - let rep = DlReporter::new(global_url_index, tx); let url = if zippy {