use std::path::Path; use std::process::exit; use clap::{ App, Arg, ArgGroup, crate_version }; use tokio::sync::mpsc; use futures::future::join_all; use std::time::SystemTime; use std::io::BufRead; use std::collections::HashMap; use dlreport::{ DlReport, DlStatus, DlReporter }; use errors::ResBE; mod zippy; mod download; mod errors; mod dlreport; #[tokio::main] async fn main() -> ResBE<()> { let arguments = App::new("FDL - Fast/File Downloader") .version(crate_version!()) .about("Download files fast") .arg( Arg::with_name("outdir") .short("o") .long("outdir") .value_name("OUTPUT DIR") .takes_value(true) .help("Set the output directory") ) .arg( Arg::with_name("numdl") .short("n") .long("numdl") .value_name("NUMBER OF CONCURRENT DOWNLOADS") .takes_value(true) .help("Specify the number concurrent downloads") ) .arg( Arg::with_name("zippyshare") .short("z") .long("zippy") .takes_value(false) .help("The provided URLs are zippyshare URLs and need to be resolved") ) .group( ArgGroup::with_name("action") .required(true) ) .arg( Arg::with_name("listfile") .short("l") .long("listfile") .value_name("URL LIST") .takes_value(true) .group("action") .help("Download all files form the specified url list") ) .arg( Arg::with_name("download") .short("d") .long("download") .value_name("URL") .takes_value(true) .group("action") .help("Download only the specified URL") ) .arg( Arg::with_name("resolve") .short("r") .long("resolve") .value_name("URL") .takes_value(true) .group("action") .help("Resolve the zippyshare url to real download url") ) .get_matches(); let outdir = match arguments.value_of("outdir") { Some(it) => it, None => "./" }; let numparal = match arguments.value_of("numdl") { Some(it) => it, None => "1" }; let numparal: i32 = match numparal.parse() { Ok(it) => it, Err(_) => { eprintln!("Invalid value for numdl: {}", numparal); exit(1); } }; let is_zippy = arguments.is_present("zippyshare"); if arguments.is_present("listfile") { let listfile = arguments.value_of("listfile").unwrap(); let ifile = std::fs::File::open(listfile)?; let mut urls: Vec = std::io::BufReader::new(ifile) .lines() .map(|l| l.unwrap()) .filter(|url| url.len() > 0 && !url.starts_with("#")) .collect(); if is_zippy { let mut zippy_urls = Vec::new(); for url in urls { zippy_urls.push( match zippy::resolve_link(&url).await { Ok(url) => url, Err(e) => { println!("Zippyshare link could not be resolved"); eprintln!("{}", e); exit(1); } } ) } urls = zippy_urls; } download_multiple(urls, outdir, numparal).await?; } else if arguments.is_present("download") { let url = arguments.value_of("download").unwrap(); let url = if is_zippy { match zippy::resolve_link(&url).await { Ok(url) => url, Err(e) => { println!("Zippyshare link could not be resolved"); eprintln!("{}", e); exit(1); } } } else { url.to_string() }; download_one(&url, outdir).await?; } else if arguments.is_present("resolve") { let url = arguments.value_of("resolve").unwrap(); match zippy::resolve_link(&url).await { Ok(resolved_url) => { println!("{}", resolved_url); }, Err(e) => { println!("Zippyshare link could not be resolved"); eprintln!("{}", e); exit(1); } } } else { println!("Something went very wrong..."); } Ok(()) } async fn download_one(url: &str, outdir: &str) -> ResBE<()> { let outdir = Path::new(outdir); if !outdir.exists() { std::fs::create_dir_all(outdir)?; } let file_name = download::url_to_filename(url); let into_file = outdir.join(Path::new(&file_name)); let into_file = into_file.to_str().unwrap().to_string(); let path_into_file = Path::new(&into_file); // If file with same name is present locally, check filesize if path_into_file.exists() { let (filesize, _) = download::http_get_filesize_and_range_support(&url).await?; let local_filesize = std::fs::metadata(path_into_file)?.len(); if filesize == local_filesize { println!("Skipping file '{}': already present", &file_name); return Ok(()); } else { println!("Replacing file '{}': present but not completed", &file_name); } } // Create com channel to get feedback on download progress let (tx, mut rx) = mpsc::unbounded_channel::(); // Start download nonblocking let url = url.to_string(); let jh_download = tokio::spawn(async move { // Create reporter with id 0 since there is only one anyways let rep = DlReporter::new(0, tx); if let Err(e) = download::download_feedback(&url, &into_file, rep).await { eprintln!("Error while downloading"); eprintln!("{}", e); } }); let mut t_last = SystemTime::UNIX_EPOCH; let mut filesize = 0; // Handle download status updates until all transmitters are closed // this happens when the download is completed while let Some(update) = rx.recv().await { match update.status { DlStatus::Init { bytes_total, filename } => { println!("Starting download for file '{}'", &filename); filesize = bytes_total; }, DlStatus::Update { speed_mbps, bytes_curr } => { // Print update every second, otherwise ignore the updates if t_last.elapsed()?.as_millis() > 1000 { let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0; println!("Status: {:6.2} mb/s {:5.2}% completed", speed_mbps, percent_complete); t_last = SystemTime::now(); } }, DlStatus::Done { duration_ms } => { println!("Status: 100% completed"); println!("Download took {} seconds", (duration_ms / 1000)); } } } // Await the download just to make sure jh_download.await?; Ok(()) } async fn download_multiple(urls: Vec, outdir: &str, numparal: i32) -> ResBE<()> { let outdir = Path::new(outdir); if !outdir.exists() { std::fs::create_dir_all(outdir)?; } let mut joiners = Vec::new(); let (tx, mut rx) = mpsc::unbounded_channel::(); for offset in 0..numparal { let urls: Vec = urls .iter() .enumerate() .filter(|(index, _)| (index) % numparal as usize == offset as usize) .map(|(_, v)| v.to_owned()) .collect(); let tx = tx.clone(); let outdir = outdir.to_owned(); let offset = offset; joiners.push(tokio::task::spawn(async move { for (i, url) in urls.iter().enumerate() { // Recalculated index in the main url vector, used as id let global_url_index = i as i32 * numparal + offset; let file_name = download::url_to_filename(&url); let into_file = outdir.join(Path::new(&file_name)); let into_file = into_file.to_str().unwrap().to_string(); let path_into_file = Path::new(&into_file); // If file with same name is present locally, check filesize if path_into_file.exists() { let (filesize, _) = download::http_get_filesize_and_range_support(&url).await.unwrap(); let local_filesize = std::fs::metadata(path_into_file).unwrap().len(); if filesize == local_filesize { println!("Skipping file '{}': already present", &file_name); continue; } else { println!("Replacing file '{}': present but not completed", &file_name); } } let rep = DlReporter::new(global_url_index, tx.clone()); if let Err(e) = download::download_feedback(&url, &into_file, rep).await { eprintln!("Error while downloading '{}'", file_name); eprintln!("{}", e); } } })) } drop(tx); // filename, total size bytes, current size bytes, download speed mbps let mut statuses: HashMap = HashMap::new(); let mut t_last = SystemTime::now(); while let Some(update) = rx.recv().await { match update.status { DlStatus::Init { bytes_total, filename } => { println!("Starting download for file '{}'", &filename); statuses.insert(update.id, (filename, bytes_total, 0, 0.0)); }, DlStatus::Update { speed_mbps, bytes_curr } => { // Scope the reference to prevent borrowing conflict later { let s = &mut statuses.get_mut(&update.id).unwrap(); s.2 = bytes_curr; s.3 = speed_mbps; } if t_last.elapsed().unwrap().as_millis() > 2000 { let mut dl_speed_sum = 0.0; for (_k, v) in &statuses { let filename = &v.0; let filesize = v.1; let bytes_curr = v.2; let speed_mbps = v.3; let percent_complete = bytes_curr as f64 / filesize as f64 * 100.0; println!("Status: {:6.2} mb/s {:5.2}% completed '{}'", speed_mbps, percent_complete, filename); dl_speed_sum += speed_mbps; } println!("Accumulated download speed: {:6.2} mb/s\n", dl_speed_sum); t_last = SystemTime::now(); } }, DlStatus::Done { duration_ms } => { println!( "Status: 100% completed '{}'\nDownload took {} seconds", &statuses.get(&update.id).unwrap().0, (duration_ms / 1000) ); statuses.remove(&update.id); } } } join_all(joiners).await; Ok(()) }