Better split URLs between worker tasks

- URLs are now extracted from a synchronized queue by each task
- Bump version to 0.1.2
This commit is contained in:
Daniel M 2021-06-11 19:23:22 +02:00
parent ea73355eef
commit 33f559ad4f
3 changed files with 23 additions and 18 deletions

2
Cargo.lock generated
View File

@ -151,7 +151,7 @@ dependencies = [
[[package]] [[package]]
name = "ffdl" name = "ffdl"
version = "0.1.1" version = "0.1.2"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ffdl" name = "ffdl"
version = "0.1.1" version = "0.1.2"
authors = ["daniel m <danielm@dnml.de>"] authors = ["daniel m <danielm@dnml.de>"]
edition = "2018" edition = "2018"

View File

@ -1,6 +1,9 @@
use std::collections::VecDeque;
use std::path::Path; use std::path::Path;
use std::process::exit; use std::process::exit;
use std::sync::Arc;
use clap::{ App, Arg, ArgGroup, crate_version }; use clap::{ App, Arg, ArgGroup, crate_version };
use std::sync::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use futures::future::join_all; use futures::future::join_all;
use std::io::BufRead; use std::io::BufRead;
@ -27,7 +30,7 @@ enum CLIAction {
struct CLIArguments { struct CLIArguments {
outdir: String, outdir: String,
into_file: Option<String>, into_file: Option<String>,
file_count: u32, parallel_file_count: u32,
conn_count: u32, conn_count: u32,
zippy: bool, zippy: bool,
action: CLIAction, action: CLIAction,
@ -172,7 +175,7 @@ async fn main() -> ResBE<()> {
let mut cli_args = CLIArguments { let mut cli_args = CLIArguments {
outdir: outdir, outdir: outdir,
into_file: into_file, into_file: into_file,
file_count: file_count, parallel_file_count: file_count,
conn_count: conn_count, conn_count: conn_count,
zippy: is_zippy, zippy: is_zippy,
action: action, action: action,
@ -231,12 +234,19 @@ async fn download_multiple(cli_args: CLIArguments) -> ResBE<()> {
let outdir = cli_args.outdir; let outdir = cli_args.outdir;
let outdir = Path::new(&outdir); let outdir = Path::new(&outdir);
let file_count = cli_args.file_count; let parallel_file_count = cli_args.parallel_file_count;
let zippy = cli_args.zippy; let zippy = cli_args.zippy;
let conn_count = cli_args.conn_count; let conn_count = cli_args.conn_count;
let urls = Arc::new(Mutex::new(VecDeque::<(u32, String)>::new()));
cli_args.urls.iter().enumerate().for_each(|(i, url)| {
urls.lock().unwrap().push_back((i as u32, url.clone()));
});
if !outdir.exists() { if !outdir.exists() {
if let Err(_e) = std::fs::create_dir_all(outdir) { if let Err(_e) = std::fs::create_dir_all(outdir) {
eprintln!("Error creating output directory '{}'", outdir.to_str().unwrap()); eprintln!("Error creating output directory '{}'", outdir.to_str().unwrap());
@ -250,29 +260,24 @@ async fn download_multiple(cli_args: CLIArguments) -> ResBE<()> {
let (tx, rx) = mpsc::unbounded_channel::<DlReport>(); let (tx, rx) = mpsc::unbounded_channel::<DlReport>();
for offset in 0 .. file_count { for _offset in 0 .. parallel_file_count {
let urls: Vec<String> = cli_args.urls
.iter()
.enumerate()
.filter(|(index, _)| (index) % file_count as usize == offset as usize)
.map(|(_, v)| v.to_owned())
.collect();
let tx = tx.clone(); let tx = tx.clone();
let outdir = outdir.to_owned(); let outdir = outdir.to_owned();
let offset = offset;
let arg_filename = cli_args.into_file.clone(); let arg_filename = cli_args.into_file.clone();
let urls = urls.clone();
joiners.push(tokio::task::spawn(async move { joiners.push(tokio::task::spawn(async move {
for (i, url) in urls.iter().enumerate() { loop {
let (global_url_index, url) = match urls.lock().unwrap().pop_front() {
Some(it) => it,
None => break
};
let tx = tx.clone(); let tx = tx.clone();
// Recalculated index in the main url vector, used as id
let global_url_index = i as u32 * file_count + offset;
let rep = DlReporter::new(global_url_index, tx); let rep = DlReporter::new(global_url_index, tx);
let url = if zippy { let url = if zippy {