Compare commits

..

No commits in common. "master" and "29475dd3bdcfef5263ddc24f320bd2ebb2b8f256" have entirely different histories.

14 changed files with 1269 additions and 1628 deletions

924
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,15 @@
[package] [package]
name = "ffdl" name = "ffdl"
version = "0.1.8" version = "0.1.2"
authors = ["daniel m <danielm@dnml.de>"] authors = ["daniel m <danielm@dnml.de>"]
edition = "2021" edition = "2018"
description = "Download files fast"
[dependencies] [dependencies]
tokio = { version = "1.24.1", features = [ "full" ] } tokio = { version = "1.2.0", features = [ "full" ] }
reqwest = { version = "0.11.13", features = [ "stream" ] } reqwest = { version = "0.11.2", features = [ "stream" ] }
futures = "0.3.25" futures = "0.3.12"
percent-encoding = "2.2.0" percent-encoding = "2.1.0"
regex = "1.7.1" regex = "1.4.3"
clap = { version = "3.1.12", features = [ "derive" ] } crossterm = "0.19.0"
chrono = "0.4.23" clap = "2.33.3"
thiserror = "1.0.38" chrono = "0.4"
anyhow = "1.0.68"
crossterm = "0.25.0"
ducc = "0.1.5"

View File

@ -1,86 +0,0 @@
# Fast File Downloader
A simple CLI application to download files over HTTP with the goal of maximum throuput by parallelizing downloads. Optimized for downloading large quantities of files from [zippyshare](https://zippyshare.com/) as fast as possible.
## Features
* Automatically download files from a URL list
* Automatically fetch actual download URLs for [zippyshare](https://zippyshare.com/) and [sendcm](https://send.cm/) links
* Download multiple files at the same time
* Use multiple connections to download non overlapping chunks of the same file in parallel
* This can be used to gain more download speed than a server would normally provide to a single connection
* Works really well with zippyshare
* This requires high random write performance for the local storage and will bottleneck on HDDs
* Only works if the host supports the HTTP Range header and doesn't otherwise block it
## Performance
On a 1000Mbit connection (actual speed ~800Mbit) I can download with about 90MB/s through a wireguard VPN from zippyshare when using 5 files in parallel with 10 connections each (`-n 5 -c 10`).
When downloading the same content manually through a browser I get about 5MB/s per file, dropping to 0.5-1MB/s after a few seconds. Also I of course have to start all the downloads by hand which is quite annoying.
## Usage
Simply create a textfile and enter the URLs one per line. Comments using `#` and empty lines are allowed.
```
$ cat list.txt
# My encrypted documents backup
https://www71.zippyshare.com/v/XXXXXXX/file.html
https://www72.zippyshare.com/v/XXXXYYY/file.html
https://www73.zippyshare.com/v/XXXXZZZ/file.html
https://www74.zippyshare.com/v/Xagdgfh/file.html
# My encrypted video backup
https://www75.zippyshare.com/v/sdsgfds/file.html
https://www76.zippyshare.com/v/dfsdxfd/file.html
https://www75.zippyshare.com/v/dsgsdgf/file.html
https://www76.zippyshare.com/v/drtdrtd/file.html
https://www75.zippyshare.com/v/erdfghd/file.html
https://www76.zippyshare.com/v/87654rd/file.html
https://www75.zippyshare.com/v/dfghdfg/file.html
https://www76.zippyshare.com/v/hkjghjk/file.html
```
And then download the files using the desired levels of concurrency (e.g. 5 files at once with 10 connections each):
```
$ ffdl -l list.txt -o ~/Downloads/ -c 10 -n 5
```
## CLI Arguments
```
$ ffdl --help
FFDL - Fast File Downloader 0.1.2
Download files fast
USAGE:
ffdl.exe [OPTIONS]
OPTIONS:
-c, --connections <CONNECTIONS PER FILE>
The number concurrent connections per file download. Increasing this number will
increase the download speed of individual files if supported by the server but setting
this number too high may cause the download to fail.
NOTE: This mode will write cause random writes and for that reason won't work on HDDs.
WARNING: Files started with multiple connections currently can't be continued. [default:
1]
-d, --download <URL>
Download only the one file from the specified url
-h, --help
Print help information
-l, --listfile <URL LISTFILE>
Download all files from the specified url list file
-n, --num-files <PARALLEL DOWNLOADS>
Specify the number of files from that should be downloaded in parallel. Increasing this
number will increase the total download speed but won't improve the download speed for
individual files [default: 1]
-o, --outdir <OUTPUT DIR>
Set the output directory in which the downloads will be stored. The directory will be
created if it doesn't exit yet [default: ./]
-V, --version
Print version information
```

View File

@ -1,62 +0,0 @@
use std::{num::NonZeroU32, path::PathBuf};
use clap::Parser;
#[derive(Parser, Clone, Debug)]
#[clap(
version,
about,
long_about = None,
name = "FFDL - Fast File Downloader",
)]
pub struct CLIArgs {
#[clap(
short = 'o',
long = "outdir",
value_name = "OUTPUT DIR",
default_value = "./",
help = "Set the output directory in which the downloads will be stored. \
The directory will be created if it doesn't exit yet",
)]
pub outdir: PathBuf,
#[clap(
short = 'n',
long = "num-files",
value_name = "PARALLEL DOWNLOADS",
default_value = "1",
help = "Specify the number of files from that should be downloaded in parallel. Increasing \
this number will increase the total download speed but won't improve the download speed \
for individual files",
)]
pub file_count: NonZeroU32,
#[clap(
short = 'c',
long = "connections",
value_name = "CONNECTIONS PER FILE",
default_value = "1",
help = "The number concurrent connections per file download. Increasing this number will \
increase the download speed of individual files if supported by the server but \
setting this number too high may cause the download to fail. \n\
NOTE: This mode will write cause random writes and for that reason won't work on HDDs. \
WARNING: Files started with multiple connections currently can't be continued.",
)]
pub conn_count: NonZeroU32,
#[clap(
short = 'l',
long = "listfile",
value_name = "URL LISTFILE",
help = "Download all files from the specified url list file",
)]
pub listfile: Vec<PathBuf>,
#[clap(
short = 'd',
long = "download",
value_name = "URL",
help = "Download only the one file from the specified url",
)]
pub download: Vec<String>,
}

View File

@ -1,208 +0,0 @@
use std::collections::{HashMap, VecDeque};
use std::io::stdout;
use std::time::SystemTime;
use anyhow::Result;
use crossterm::cursor::MoveToPreviousLine;
use crossterm::execute;
use crossterm::style::Print;
use crossterm::terminal::{Clear, ClearType};
use tokio::sync::mpsc;
use crate::dlreport::{DlReport, DlStatus, InfoHolder};
fn print_accumulated_report(
statuses: &HashMap<u32, InfoHolder>,
msg_queue: &mut VecDeque<String>,
moved_lines: u16,
file_count_completed: i32,
file_count_total: i32,
) -> Result<u16> {
let mut dl_speed_sum = 0.0;
execute!(
stdout(),
crossterm::cursor::Hide,
MoveToPreviousLine(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
execute!(
stdout(),
Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
}
execute!(
stdout(),
Print("----------------------------------------".to_string()),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
for v in statuses.values() {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
execute!(
stdout(),
Print(format!(
"Status: {:6.2} mb/s {:5.2}% completed '{}'",
v.speed_mbps, percent_complete, v.filename
)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
dl_speed_sum += v.speed_mbps;
}
let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0;
execute!(
stdout(),
Clear(ClearType::CurrentLine),
Print("\n"),
Print(format!(
" =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%",
dl_speed_sum, file_count_completed, file_count_total, file_percent_completed
)),
Clear(ClearType::UntilNewLine),
Print("\n"),
Clear(ClearType::FromCursorDown),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
}
/// Receive download reports from the provided receiver and print them to stdout using dynamic
/// refreshes of the terminal. This will block until all senders are closed.
pub async fn cli_print_reports(
mut receiver: mpsc::UnboundedReceiver<DlReport>,
file_count_total: i32,
) -> Result<()> {
let mut statuses: HashMap<u32, InfoHolder> = HashMap::new();
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new();
let mut t_last = SystemTime::now();
let mut file_count_completed = 0;
let mut file_count_failed = 0;
let mut file_count_done = 0;
while let Some(update) = receiver.recv().await {
match update.status {
DlStatus::Init {
bytes_total,
filename,
} => {
msg_queue.push_back(format!("Starting download for file '{}'", &filename));
statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
}
DlStatus::Update {
speed_mbps,
bytes_curr,
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
}
DlStatus::Done { duration_ms } => {
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id);
file_count_completed += 1;
file_count_done += 1;
}
DlStatus::DoneErr { filename } => {
msg_queue.push_back(format!("Error: Download failed: '{}'", filename));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
file_count_failed += 1;
file_count_done += 1;
}
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
moved_lines = print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
t_last = SystemTime::now();
}
DlStatus::Skipped => {
file_count_completed += 1;
file_count_done += 1;
}
}
}
print_accumulated_report(
&statuses,
&mut msg_queue,
moved_lines,
file_count_done,
file_count_total,
)?;
execute!(
stdout(),
MoveToPreviousLine(2),
Print(format!(
"All done! {}/{} completed, {} failed\n",
file_count_completed, file_count_total, file_count_failed
)),
Clear(ClearType::FromCursorDown)
)?;
Ok(())
}

View File

@ -1,158 +1,251 @@
use std::{collections::HashMap, time::SystemTime}; use std::collections::{ HashMap, VecDeque };
use std::time::SystemTime;
use std::io::stdout;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc;
use crossterm::cursor::{ MoveToPreviousLine };
use crossterm::execute;
use crossterm::terminal::{ Clear, ClearType };
use crossterm::style::{ Print };
use crate::errors::*;
use crate::misc::RollingAverage;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum DlStatus { pub enum DlStatus {
Init { bytes_total: u64, filename: String }, Init {
Update { speed_mbps: f32, bytes_curr: u64 }, bytes_total: u64,
Done { duration_ms: u64 }, filename: String
DoneErr { filename: String }, },
Update {
speed_mbps: f32,
bytes_curr: u64
},
Done {
duration_ms: u64
},
DoneErr {
filename: String
},
Skipped, Skipped,
Message(String), Message(String)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DlReport { pub struct DlReport {
pub id: u32, pub id: u32,
pub status: DlStatus, pub status: DlStatus
} }
#[derive(Clone)] #[derive(Clone)]
pub struct DlReporter { pub struct DlReporter {
id: u32, id: u32,
transmitter: mpsc::UnboundedSender<DlReport>, transmitter: mpsc::UnboundedSender<DlReport>
} }
impl DlReporter { impl DlReporter {
pub fn new(id: u32, transmitter: mpsc::UnboundedSender<DlReport>) -> DlReporter { pub fn new(id: u32, transmitter: mpsc::UnboundedSender<DlReport>) -> DlReporter {
DlReporter { id, transmitter } DlReporter {
id: id,
transmitter: transmitter
}
} }
pub fn send(& self, status: DlStatus) { pub fn send(& self, status: DlStatus) {
// This should not fail, so unwrap it here instead propagating the error // This should not fail, so unwrap it here instead propagating the error
self.transmitter self.transmitter.send(
.send(DlReport { DlReport {
id: self.id, id: self.id,
status, status: status
})
.unwrap();
} }
).unwrap();
pub fn init(&self, bytes_total: u64, filename: String) {
self.send(DlStatus::Init {
bytes_total,
filename,
})
}
pub fn update(&self, speed_mbps: f32, bytes_curr: u64) {
self.send(DlStatus::Update {
speed_mbps,
bytes_curr,
})
}
pub fn done(&self, duration_ms: u64) {
self.send(DlStatus::Done { duration_ms })
}
pub fn done_err(&self, filename: String) {
self.send(DlStatus::DoneErr { filename })
}
pub fn skipped(&self) {
self.send(DlStatus::Skipped);
}
pub fn msg(&self, msg: String) {
self.send(DlStatus::Message(msg));
} }
} }
#[macro_export]
macro_rules! report_msg {
($rep:ident, $fmt:expr) => {
DlReporter::msg(&$rep, format!($fmt));
};
($rep:ident, $fmt:expr, $($fmt2:expr),+) => {
DlReporter::msg(&$rep, format!($fmt, $($fmt2,)+));
};
}
pub struct InfoHolder { struct InfoHolder {
pub filename: String, filename: String,
pub total_size: u64, total_size: u64,
pub progress: u64, progress: u64,
pub speed_mbps: f32, speed_mbps: f32
} }
impl InfoHolder { impl InfoHolder {
pub fn new(filename: String, total_size: u64) -> InfoHolder {
fn new(filename: String, total_size: u64) -> InfoHolder {
InfoHolder { InfoHolder {
filename, filename,
total_size, total_size,
progress: 0, progress: 0,
speed_mbps: 0.0, speed_mbps: 0.0
}
} }
} }
pub struct DlReportAccumulator {
parent: DlReporter,
rec: UnboundedReceiver<DlReport>,
} }
impl DlReportAccumulator { fn print_accumulated_report(statuses: & HashMap<u32, InfoHolder>, msg_queue: &mut VecDeque<String>, moved_lines: u16, file_count_completed: i32, file_count_total: i32) -> ResBE<u16> {
pub fn new(parent: DlReporter) -> (DlReportAccumulator, UnboundedSender<DlReport>) { let mut dl_speed_sum = 0.0;
let (tx, rec) = mpsc::unbounded_channel();
(DlReportAccumulator { parent, rec }, tx) execute!(
stdout(),
crossterm::cursor::Hide,
MoveToPreviousLine(moved_lines)
)?;
for msg in msg_queue.drain(..) {
let ct_now = chrono::Local::now();
execute!(
stdout(),
Print(format!("{} > {}", ct_now.format("%H:%M:%S"), msg)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
} }
pub async fn accumulate(mut self) { execute!(
let mut progresses: HashMap<u32, u64> = HashMap::new(); stdout(),
Print(format!("----------------------------------------")),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
let mut progress_last: u64 = 0; for (_k, v) in statuses {
let percent_complete = v.progress as f64 / v.total_size as f64 * 100.0;
execute!(
stdout(),
Print(format!("Status: {:6.2} mb/s {:5.2}% completed '{}'", v.speed_mbps, percent_complete, v.filename)),
Clear(ClearType::UntilNewLine),
Print("\n")
)?;
dl_speed_sum += v.speed_mbps;
}
let file_percent_completed = file_count_completed as f32 / file_count_total as f32 * 100.0;
execute!(
stdout(),
Clear(ClearType::CurrentLine),
Print("\n"),
Print(format!(" =>> Accumulated download speed: {:6.2} mb/s {}/{} files, {:.0}%", dl_speed_sum, file_count_completed, file_count_total, file_percent_completed)),
Clear(ClearType::UntilNewLine),
Print("\n"),
Clear(ClearType::FromCursorDown),
crossterm::cursor::Show
)?;
// Next time go up 1 line for each printed status, +2 for divider & space, +1 for accumulated
Ok(statuses.len() as u16 + 3)
}
pub async fn watch_and_print_reports(mut receiver: mpsc::UnboundedReceiver<DlReport>, file_count_total: i32) -> ResBE<()> {
let mut statuses: HashMap<u32, InfoHolder> = HashMap::new();
let mut moved_lines = 0;
let mut msg_queue = VecDeque::new();
let mut t_last = SystemTime::now(); let mut t_last = SystemTime::now();
let mut average_speed = RollingAverage::new(10); let mut file_count_completed = 0;
let mut file_count_failed = 0;
let mut file_count_done = 0;
while let Some(update) = self.rec.recv().await { while let Some(update) = receiver.recv().await {
match update.status { match update.status {
DlStatus::Init { DlStatus::Init {
bytes_total: _, bytes_total,
filename: _, filename
} => {}
DlStatus::Update {
speed_mbps: _,
bytes_curr,
} => { } => {
*progresses.entry(update.id).or_insert(0) = bytes_curr;
let progress_curr = progresses.values().sum(); msg_queue.push_back(format!("Starting download for file '{}'", &filename));
let progress_delta = progress_curr - progress_last; statuses.insert(update.id, InfoHolder::new(filename, bytes_total));
let t_elapsed = t_last.elapsed().unwrap().as_secs_f64();
let speed_mbps = average_speed.value() as f32; moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines, file_count_done, file_count_total)?;
// currently executes always, but might change },
if progress_delta >= 5_000_000 { DlStatus::Update {
average_speed.add(((progress_delta as f64) / 1_000_000.0) / t_elapsed); speed_mbps,
bytes_curr
} => {
// Scope the reference to prevent borrowing conflict later
{
let s = &mut statuses.get_mut(&update.id).unwrap();
s.progress = bytes_curr;
s.speed_mbps = speed_mbps;
}
if t_last.elapsed().unwrap().as_millis() > 500 {
moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines, file_count_done, file_count_total)?;
progress_last = progress_curr;
t_last = SystemTime::now(); t_last = SystemTime::now();
} }
self.parent.update(speed_mbps, progress_curr); },
} DlStatus::Done {
DlStatus::Done { duration_ms: _ } => {} duration_ms
} => {
msg_queue.push_back(format!(
"Finished downloading '{}' with {:.2} mb in {:.2} seconds",
&statuses.get(&update.id).unwrap().filename,
(statuses.get(&update.id).unwrap().total_size as f32 / 1_000_000.0),
(duration_ms as f32 / 1_000.0)
));
statuses.remove(&update.id);
file_count_completed += 1;
file_count_done += 1;
},
DlStatus::DoneErr {
filename
} => {
msg_queue.push_back(format!(
"Error: Download failed: '{}'", filename
));
// Don't care if it exists, just make sure it is gone
statuses.remove(&update.id);
// Refresh display
moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines, file_count_done, file_count_total)?;
t_last = SystemTime::now();
file_count_failed += 1;
file_count_done += 1;
},
DlStatus::Message(msg) => {
msg_queue.push_back(msg);
moved_lines = print_accumulated_report(&statuses, &mut msg_queue, moved_lines, file_count_done, file_count_total)?;
t_last = SystemTime::now();
},
DlStatus::Skipped => {
file_count_completed += 1;
file_count_done += 1;
}
// Just forwared everything else to the calling receiver
_ => self.parent.send(update.status),
}
} }
} }
print_accumulated_report(&statuses, &mut msg_queue, moved_lines, file_count_done, file_count_total)?;
execute!(
stdout(),
MoveToPreviousLine(2),
Print(format!("All done! {}/{} completed, {} failed\n", file_count_completed, file_count_total, file_count_failed)),
Clear(ClearType::FromCursorDown)
)?;
Ok(())
} }

View File

@ -1,57 +1,104 @@
use std::io::SeekFrom;
use std::path::Path; use std::path::Path;
use tokio::io::{ AsyncWriteExt, AsyncSeekExt };
use std::time::SystemTime; use std::time::SystemTime;
use percent_encoding::percent_decode_str;
use anyhow::Result; use std::io::SeekFrom;
use tokio::sync::mpsc;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use percent_encoding::percent_decode_str;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use crate::dlreport::{DlReportAccumulator, DlReporter}; use crate::errors::*;
use crate::errors::DlError; use crate::dlreport::*;
use crate::integrations::HttpRequestPreprocessor;
use crate::misc::RollingAverage;
struct RollingAverage {
index: usize,
data: Vec<f64>
}
impl RollingAverage {
fn new(size: usize) -> Self {
RollingAverage {
index: 0,
data: Vec::with_capacity(size)
}
}
fn value(&self) -> f64 {
if self.data.len() == 0 {
0.0
} else {
let mut max = self.data[0];
for v in self.data.iter() {
if *v > max {
max = *v;
}
}
let mut sum: f64 = self.data.iter().sum();
let mut count = self.data.len();
if self.data.len() >= 3 {
sum -= max;
count -= 1;
}
sum / count as f64
}
}
fn add(&mut self, val: f64) {
if self.data.capacity() == self.data.len() {
self.data[self.index] = val;
self.index += 1;
if self.index >= self.data.capacity() {
self.index = 0;
}
} else {
self.data.push(val);
}
}
}
/// Get the filename at the end of the given URL. This will decode the URL Encoding. /// Get the filename at the end of the given URL. This will decode the URL Encoding.
pub fn url_to_filename(url: &str) -> String { pub fn url_to_filename(url: &str) -> String {
let url_dec = percent_decode_str(url) let url_dec = percent_decode_str(&url).decode_utf8_lossy().to_owned().to_string();
.decode_utf8_lossy() let file_name = std::path::Path::new(&url_dec).file_name().unwrap().to_str().unwrap();
.to_owned()
.to_string();
let file_name = std::path::Path::new(&url_dec)
.file_name()
.unwrap()
.to_str()
.unwrap();
// Split at ? and return the first part. If no ? is present, this just returns the full string // Split at ? and return the first part. If no ? is present, this just returns the full string
file_name.split('?').next().unwrap().to_string() file_name.split("?").next().unwrap().to_string()
} }
pub async fn download_feedback( pub async fn download_feedback(url: &str, into_file: &str, rep: DlReporter, content_length: Option<u64>) -> ResBE<()> {
url: &str,
into_file: &Path, download_feedback_chunks(url, into_file, rep, None, false, content_length).await
rep: DlReporter,
content_length: u64,
) -> Result<()> {
download_feedback_chunks(url, into_file, rep, None, content_length).await
} }
pub async fn download_feedback_chunks( pub async fn download_feedback_chunks(url: &str, into_file: &str, rep: DlReporter, from_to: Option<(u64, u64)>, seek_from: bool, content_length: Option<u64>) -> ResBE<()> {
url: &str, let into_file = Path::new(into_file);
into_file: &Path,
rep: DlReporter, let mut content_length = match content_length {
from_to: Option<(u64, u64)>, Some(it) => it,
mut content_length: u64, None => {
) -> Result<()> { let (content_length, _) = http_get_filesize_and_range_support(url).await?;
// Build the HTTP request to download the given link content_length
let mut req = reqwest::Client::new().get(url); }
req = req.integration_preprocess(url); };
// Send the HTTP request to download the given link
let mut req = reqwest::Client::new()
.get(url);
// Add range header if needed // Add range header if needed
if let Some((from, to)) = from_to { if let Some(from_to) = from_to {
req = req.header(reqwest::header::RANGE, format!("bytes={}-{}", from, to)); req = req.header(reqwest::header::RANGE, format!("bytes={}-{}", from_to.0, from_to.1));
content_length = to - from + 1; content_length = from_to.1 - from_to.0 + 1;
} }
// Actually send the request and get the response // Actually send the request and get the response
@ -63,22 +110,35 @@ pub async fn download_feedback_chunks(
} }
// Open the local output file // Open the local output file
let mut opts = tokio::fs::OpenOptions::new(); let mut ofile = tokio::fs::OpenOptions::new();
let mut ofile = opts
.create(true)
.write(true)
.truncate(from_to.is_none())
.open(into_file)
.await?;
if from_to.is_some() { // Create the file if not existant
ofile.create(true)
// Open in write mode
.write(true);
// If seek_from is specified, the file cant be overwritten
if !seek_from {
// Delete and overwrite the file
ofile.truncate(true);
}
let mut ofile = ofile.open(into_file).await?;
if seek_from {
ofile.seek(SeekFrom::Start(from_to.unwrap().0)).await?; ofile.seek(SeekFrom::Start(from_to.unwrap().0)).await?;
} }
let filename = into_file.file_name().unwrap().to_str().unwrap(); let filename = into_file.file_name().unwrap().to_str().unwrap();
// Report the download start // Report the download start
rep.init(content_length, filename.to_string()); rep.send(
DlStatus::Init {
bytes_total: content_length,
filename: filename.to_string()
}
);
let mut curr_progress = 0; let mut curr_progress = 0;
let mut speed_mbps = 0.0; let mut speed_mbps = 0.0;
@ -94,6 +154,7 @@ pub async fn download_feedback_chunks(
// Read data from server as long as new data is available // Read data from server as long as new data is available
while let Some(chunk) = resp.chunk().await? { while let Some(chunk) = resp.chunk().await? {
let datalen = chunk.len() as u64; let datalen = chunk.len() as u64;
buff.extend(chunk); buff.extend(chunk);
@ -103,6 +164,7 @@ pub async fn download_feedback_chunks(
// io bottleneck that occurs on HDDs with many small writes in different // io bottleneck that occurs on HDDs with many small writes in different
// files and offsets at the same time // files and offsets at the same time
if buff.len() >= 1_000_000 { if buff.len() >= 1_000_000 {
// Write the received data into the file // Write the received data into the file
ofile.write_all(&buff).await?; ofile.write_all(&buff).await?;
@ -117,11 +179,14 @@ pub async fn download_feedback_chunks(
let t_elapsed = t_last_speed.elapsed()?.as_secs_f64(); let t_elapsed = t_last_speed.elapsed()?.as_secs_f64();
// Update the reported download speed after every 3MB or every second // Update the reported download speed after every 5MB or every second
// depending on what happens first // depending on what happens first
if last_bytecount >= 3_000_000 || t_elapsed >= 0.8 { if last_bytecount >= 3_000_000 || t_elapsed >= 0.8 {
// Update rolling average // Update rolling average
average_speed.add(((last_bytecount as f64) / t_elapsed) / 1_000_000.0); average_speed.add(
((last_bytecount as f64) / t_elapsed) / 1_000_000.0
);
speed_mbps = average_speed.value() as f32; speed_mbps = average_speed.value() as f32;
@ -131,10 +196,15 @@ pub async fn download_feedback_chunks(
} }
// Send status update report // Send status update report
rep.update(speed_mbps, curr_progress); rep.send(
DlStatus::Update {
speed_mbps: speed_mbps,
bytes_curr: curr_progress
}
);
} }
if !buff.is_empty() { if buff.len() > 0 {
ofile.write_all(&buff).await?; ofile.write_all(&buff).await?;
} }
@ -148,20 +218,27 @@ pub async fn download_feedback_chunks(
let duration_ms = t_start.elapsed()?.as_millis() as u64; let duration_ms = t_start.elapsed()?.as_millis() as u64;
// Send report that the download is finished // Send report that the download is finished
rep.done(duration_ms); rep.send(
DlStatus::Done {
duration_ms: duration_ms
}
);
Ok(()) Ok(())
} }
// This will spin up multiple tasks that and manage the status updates for them. // This will spin up multiple tasks that and manage the status updates for them.
// The combined status will be reported back to the caller // The combined status will be reported back to the caller
pub async fn download_feedback_multi( pub async fn download_feedback_multi(url: &str, into_file: &str, rep: DlReporter, conn_count: u32, content_length: Option<u64>) -> ResBE<()> {
url: &str,
into_file: &Path, let content_length = match content_length {
rep: DlReporter, Some(it) => it,
conn_count: u32, None => {
content_length: u64, let (content_length, _) = http_get_filesize_and_range_support(url).await?;
) -> Result<()> { content_length
}
};
// Create zeroed file with 1 byte too much. This will be truncated on download // Create zeroed file with 1 byte too much. This will be truncated on download
// completion and can indicate that the file is not suitable for continuation // completion and can indicate that the file is not suitable for continuation
create_zeroed_file(into_file, content_length as usize + 1).await?; create_zeroed_file(into_file, content_length as usize + 1).await?;
@ -171,20 +248,25 @@ pub async fn download_feedback_multi(
let mut joiners = Vec::new(); let mut joiners = Vec::new();
let (rep_accum, tx) = DlReportAccumulator::new(rep.clone()); let (tx, mut rx) = mpsc::unbounded_channel::<DlReport>();
let t_start = SystemTime::now(); let t_start = SystemTime::now();
for index in 0 .. conn_count { for index in 0 .. conn_count {
let url = url.to_owned();
let into_file = into_file.to_owned(); let url = url.clone().to_owned();
let into_file = into_file.clone().to_owned();
let tx = tx.clone(); let tx = tx.clone();
joiners.push(tokio::spawn(async move { joiners.push(tokio::spawn(async move {
let rep = DlReporter::new(index, tx.clone()); let rep = DlReporter::new(index, tx.clone());
let mut from_to = (index as u64 * chunksize, (index + 1) as u64 * chunksize - 1); let mut from_to = (
index as u64 * chunksize,
(index+1) as u64 * chunksize - 1
);
if index == conn_count - 1 { if index == conn_count - 1 {
from_to.1 += rest; from_to.1 += rest;
@ -195,14 +277,8 @@ pub async fn download_feedback_multi(
// Delay each chunk-download to reduce the number of simultanious connection attempts // Delay each chunk-download to reduce the number of simultanious connection attempts
tokio::time::sleep(tokio::time::Duration::from_millis(50 *index as u64)).await; tokio::time::sleep(tokio::time::Duration::from_millis(50 *index as u64)).await;
download_feedback_chunks( download_feedback_chunks(&url, &into_file, rep, Some(from_to), true, Some(specific_content_length)).await.map_err(|e| e.to_string())
&url,
&into_file,
rep,
Some(from_to),
specific_content_length,
)
.await
})) }))
} }
@ -210,17 +286,93 @@ pub async fn download_feedback_multi(
let filename = Path::new(into_file).file_name().unwrap().to_str().unwrap(); let filename = Path::new(into_file).file_name().unwrap().to_str().unwrap();
rep.init(content_length, filename.to_string()); rep.send(DlStatus::Init {
bytes_total: content_length,
filename: filename.to_string()
});
let rep_task = rep.clone();
let mut t_last = t_start.clone();
let manager_handle = tokio::task::spawn(async move {
let rep = rep_task;
//let mut dl_speeds = vec![0.0_f32; conn_count as usize];
let mut progresses = vec![0; conn_count as usize];
let mut progress_last: u64 = 0;
let mut average_speed = RollingAverage::new(10);
while let Some(update) = rx.recv().await {
match update.status {
DlStatus::Init {
bytes_total: _,
filename: _
} => {
},
DlStatus::Update {
speed_mbps: _,
bytes_curr
} => {
//dl_speeds[update.id as usize] = speed_mbps;
progresses[update.id as usize] = bytes_curr;
let progress_curr = progresses.iter().sum();
let progress_delta = progress_curr - progress_last;
let t_elapsed = t_last.elapsed().unwrap().as_secs_f64();
let speed_mbps = average_speed.value() as f32;
// currently executes always, but might change
if progress_delta >= 5_000_000 {
average_speed.add(
((progress_delta as f64) / 1_000_000.0) / t_elapsed
);
progress_last = progress_curr;
t_last = SystemTime::now();
}
rep.send(DlStatus::Update {
speed_mbps: speed_mbps,
bytes_curr: progress_curr
});
},
DlStatus::Done {
duration_ms: _
} => {
//dl_speeds[update.id as usize] = 0.0;
},
// Just forwared everything else to the calling receiver
_ => rep.send(update.status)
}
}
});
let manager_handle = tokio::task::spawn(rep_accum.accumulate());
let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect(); let mut joiners: FuturesUnordered<_> = joiners.into_iter().collect();
// Validate if the tasks were successful. This will always grab the next completed // Validate if the tasks were successful. This will always grab the next completed
// task, independent from the original order in the joiners list // task, independent from the original order in the joiners list
while let Some(output) = joiners.next().await { while let Some(output) = joiners.next().await {
// If any of the download tasks fail, abort the rest and delete the file // If any of the download tasks fail, abort the rest and delete the file
// since it is non-recoverable anyways // since it is non-recoverable anyways
if let Err(e) = output? { if let Err(e) = output? {
for handle in joiners.iter() { for handle in joiners.iter() {
handle.abort(); handle.abort();
} }
@ -229,7 +381,7 @@ pub async fn download_feedback_multi(
tokio::fs::remove_file(&into_file).await?; tokio::fs::remove_file(&into_file).await?;
return Err(e); return Err(e.into());
} }
} }
@ -245,53 +397,113 @@ pub async fn download_feedback_multi(
ofile.set_len(content_length).await?; ofile.set_len(content_length).await?;
rep.done(t_start.elapsed()?.as_millis() as u64); rep.send(DlStatus::Done {
duration_ms: t_start.elapsed()?.as_millis() as u64
});
Ok(()) Ok(())
} }
async fn create_zeroed_file(file: &Path, filesize: usize) -> Result<()> { async fn create_zeroed_file(file: &str, filesize: usize) -> ResBE<()> {
let ofile = tokio::fs::OpenOptions::new() let ofile = tokio::fs::OpenOptions::new()
.create(true) .create(true)
// Open in write mode
.write(true) .write(true)
// Delete and overwrite the file
.truncate(true) .truncate(true)
.open(file) .open(file)
.await?; .await?;
ofile.set_len(filesize as u64).await?; ofile.set_len(filesize as u64).await?;
Ok(()) Ok(())
} }
pub struct HttpFileInfo { pub async fn http_get_filesize_and_range_support(url: &str) -> ResBE<(u64, bool)> {
pub filesize: u64, let resp = reqwest::Client::new()
pub range_support: bool, .head(url)
pub filename: String, .send().await?;
if let Some(filesize) = resp.headers().get(reqwest::header::CONTENT_LENGTH) {
if let Ok(val_str) = filesize.to_str() {
if let Ok(val) = val_str.parse::<u64>() {
let mut range_supported = false;
if let Some(range) = resp.headers().get(reqwest::header::ACCEPT_RANGES) {
if let Ok(range) = range.to_str() {
if range == "bytes" {
range_supported = true;
}
}
} }
pub async fn http_file_info(url: &str) -> Result<HttpFileInfo> { return Ok((val, range_supported));
let mut req = reqwest::Client::new().head(url); }
req = req.integration_preprocess(url); }
let resp = req.send().await?; }
let filesize = resp Err(DlError::ContentLengthUnknown.into())
.headers() }
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|it| it.to_str().unwrap().parse::<u64>().ok())
.ok_or(DlError::ContentLengthUnknown)?;
let range = resp #[cfg(test)]
.headers() mod tests {
.get(reqwest::header::ACCEPT_RANGES)
.and_then(|it| it.to_str().ok()); use super::*;
let range_support = matches!(range, Some("bytes"));
#[test]
let filename = url_to_filename(url); fn rolling_average() {
let mut ra = RollingAverage::new(3);
let info = HttpFileInfo {
filesize, assert_eq!(0, ra.data.len());
range_support, assert_eq!(3, ra.data.capacity());
filename,
}; assert_eq!(0.0, ra.value());
Ok(info) // 10 / 1 = 10
ra.add(10.0);
assert_eq!(1, ra.data.len());
assert_eq!(10.0, ra.value());
// (10 + 20) / 2 = 15
ra.add(20.0);
assert_eq!(2, ra.data.len());
assert_eq!(15.0, ra.value());
// (10 + 20 + 30) / 3 = 20
ra.add(30.0);
assert_eq!(3, ra.data.len());
assert_eq!(20.0, ra.value());
assert_eq!(10.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
// This should replace the oldest value (index 1)
ra.add(40.0);
assert_eq!(3, ra.data.len());
assert_eq!(3, ra.data.capacity());
// (40 + 20 + 30) / 3 = 30
assert_eq!(30.0, ra.value());
assert_eq!(40.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
ra.add(50.0);
ra.add(60.0);
ra.add(70.0);
assert_eq!(70.0, ra.data[0]);
assert_eq!(50.0, ra.data[1]);
assert_eq!(60.0, ra.data[2]);
}
} }

View File

@ -1,14 +1,29 @@
use thiserror::Error; use std::error::Error;
use std::fmt::{ self, Display, Formatter };
/// Result Boxed Error
pub type ResBE<T> = Result<T, Box<dyn Error>>;
#[allow(unused)] #[allow(unused)]
#[derive(Error, Clone, Debug)] #[derive(Clone, Debug)]
pub enum DlError { pub enum DlError {
#[error("Bad http response status")]
BadHttpStatus, BadHttpStatus,
#[error("Content-Length is unknown")]
ContentLengthUnknown, ContentLengthUnknown,
#[error("Http server sent no more data")]
HttpNoData, HttpNoData,
#[error("Unknown download error: '{0}'")] Other(String)
Other(String), }
impl Error for DlError {}
impl Display for DlError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
DlError::BadHttpStatus => write!(f, "Bad http response status"),
DlError::ContentLengthUnknown => write!(f, "Content-Length is unknown"),
DlError::HttpNoData => write!(f, "Http server sent no more data"),
DlError::Other(s) => write!(f, "Unknown download error: '{}'", s)
}
}
} }

View File

@ -1,46 +0,0 @@
mod sendcm;
mod zippy;
use anyhow::Result;
pub enum IntegratedService {
ZippyShare,
SendCm,
}
pub fn is_integrated_url(url: &str) -> Option<IntegratedService> {
if zippy::is_zippyshare_url(url) {
Some(IntegratedService::ZippyShare)
} else if sendcm::is_sendcm_url(url) {
Some(IntegratedService::SendCm)
} else {
None
}
}
pub async fn resolve_integrated_url(url: &str, service: IntegratedService) -> Result<String> {
match service {
IntegratedService::ZippyShare => zippy::resolve_link(url).await,
IntegratedService::SendCm => sendcm::resolve_link(url).await,
}
}
pub trait HttpRequestPreprocessor {
/// Add required headers or other modifications to an HTTP request depending on the target
/// website
fn integration_preprocess(self, url: &str) -> Self;
}
impl HttpRequestPreprocessor for reqwest::RequestBuilder {
fn integration_preprocess(self, url: &str) -> Self {
// The sendcm CDN (sendit.cloud) requires a referer from itself to allow access and not 404
if url.contains("sendit.cloud") {
match url.split("/d/").next() {
Some(base_url) => self.header(reqwest::header::REFERER, format!("{}/", base_url)),
None => self,
}
} else {
self
}
}
}

View File

@ -1,84 +0,0 @@
use std::io::{Error, ErrorKind};
use anyhow::Result;
use regex::Regex;
pub fn is_sendcm_url(url: &str) -> bool {
Regex::new(r"^https?://send\.cm/(?:d/)?[0-9a-zA-Z]+$")
.unwrap()
.is_match(url)
}
/*
Updated: 01.04.2022
Link generation code:
- A post request is sent to the server using the form described below
- The id field is the value which is used to generate the link
- If the id is not found, the link is not generated
- The id is the same as the url suffix when NOT using a /d/ prefix url
- The reponse to the post request is a 302 redirect to the generated link
```
<form name="F1" method="POST" action="https://send.cm">
<input type="hidden" name="op" value="download2">
<input type="hidden" name="id" value="xxxxxxxxxx">
<input type="hidden" name="rand" value="">
<input type="hidden" name="referer" value="">
<input type="hidden" name="method_free" value="">
<input type="hidden" name="method_premium" value="">
......
```
*/
pub async fn resolve_link(url: &str) -> Result<String> {
let user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:77.0) Gecko/20100101 Firefox/77.0";
let accept =
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8";
// Add a few extra headers to the request in order to be less suspicious
let body = reqwest::Client::new()
.get(url)
.header("User-Agent", user_agent)
.header("Accept", accept)
.send()
.await?
.text()
.await?;
let re_link = Regex::new(r#"<input type="hidden" name="id" value="([0-9a-zA-Z]+)">"#)?;
let cap_link = match re_link.captures(&body) {
Some(cap) => cap,
None => return Err(Error::new(ErrorKind::Other, "Link not found").into()),
};
let id = &match cap_link.get(1) {
Some(id) => id.as_str(),
None => return Err(Error::new(ErrorKind::Other, "Link not found").into()),
};
let resp = reqwest::ClientBuilder::new()
.redirect(reqwest::redirect::Policy::none())
.build()?
.post("https://send.cm")
.header("User-Agent", user_agent)
.header("Accept", accept)
.form(&[
("op", "download2"),
("id", id),
("rand", ""),
("referer", ""),
("method_free", ""),
("method_premium", ""),
])
.send()
.await?;
if resp.status().is_redirection() {
match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => Ok(location.to_str()?.to_string()),
None => Err(Error::new(ErrorKind::Other, "Location header not found").into()),
}
} else {
Err(Error::new(ErrorKind::Other, "Link not found").into())
}
}

View File

@ -1,72 +0,0 @@
use std::io::{Error, ErrorKind};
use anyhow::Result;
use ducc::Ducc;
use regex::Regex;
pub fn is_zippyshare_url(url: &str) -> bool {
Regex::new(r#"^https?://(?:www\d*\.)?zippyshare\.com/v/[0-9a-zA-Z]+/file\.html$"#)
.unwrap()
.is_match(url)
}
pub async fn resolve_link(url: &str) -> Result<String> {
// Regex to check if the provided url is a zippyshare download url
let re = Regex::new(r#"^(https?://(?:www\d*\.)?zippyshare\.com)/v/[0-9a-zA-Z]+/file\.html$"#)?;
if !re.is_match(url) {
return Err(Error::new(ErrorKind::Other, "URL is not a zippyshare url").into());
}
// Extract the hostname (with https:// prefix) for later
let host = &re.captures(url).unwrap()[1];
// Download the html body for the download page
let body = reqwest::get(url).await?.text().await?;
let re_script =
Regex::new(r#"(?ms)<script.*?>(.*getElementById\('dlbutton'\).*?)</script>"#).unwrap();
let re_script_start = Regex::new(r#"(?ms)<script.*?>"#).unwrap();
// Extract the script. This will end at the correct script end, but has stuff before the start
let cap_tmp = match re_script.captures(&body) {
Some(cap) => cap,
None => return Err(Error::new(ErrorKind::Other, "Link not found").into()),
};
let temp = &cap_tmp[1];
// Find the correct script start
let pos_script_start = match re_script_start.find_iter(&temp).last() {
Some(cap) => cap,
None => return Err(Error::new(ErrorKind::Other, "Link not found").into()),
};
// Cut off the beginning to get only the script contents
let raw_script = &temp[pos_script_start.end()..];
// Preprocess the script
let script = preprocess_js(raw_script);
// Calculate the link
let link = eval_js_link_calculation(&script)
.map_err(|_| Error::new(ErrorKind::Other, "Link not found: JS eval error"))?;
let url = format!("{}{}", host, link);
Ok(url)
}
fn preprocess_js(js_src: &str) -> String {
let mut processed_src = js_src
.replace("document.getElementById('dlbutton').href", "href")
.replace("document.getElementById('fimage')", "false")
// Fix for antiscrape 24.07.2022
.replace("document.getElementById('omg').getAttribute('class')", "2")
// Fix for antiscrape 16.08.2022
.replace("document.getElementById('dlbutton').omg", "omg");
processed_src.push_str(";href");
processed_src
}
fn eval_js_link_calculation(js_src: &str) -> ducc::Result<String> {
let ducc = Ducc::new();
ducc.exec(js_src, None, Default::default())
}

View File

@ -2,189 +2,363 @@ use std::collections::VecDeque;
use std::path::Path; use std::path::Path;
use std::process::exit; use std::process::exit;
use std::sync::Arc; use std::sync::Arc;
use clap::{ App, Arg, ArgGroup, crate_version };
use std::sync::Mutex;
use tokio::sync::mpsc;
use futures::future::join_all;
use std::io::BufRead;
use std::time::SystemTime; use std::time::SystemTime;
use anyhow::Result; use dlreport::{ DlReport, DlStatus, DlReporter };
use clap::Parser; use errors::ResBE;
use futures::future::join_all;
use tokio::fs::create_dir_all;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::Mutex;
use crate::args::CLIArgs; mod zippy;
use crate::clireporter::cli_print_reports;
use crate::dlreport::{DlReport, DlReporter};
use crate::download::{download_feedback, download_feedback_multi, http_file_info};
use crate::integrations::{is_integrated_url, resolve_integrated_url};
mod args;
mod clireporter;
mod dlreport;
mod download; mod download;
mod errors; mod errors;
mod integrations; mod dlreport;
mod misc;
struct DlRequest {
id: usize, #[derive(Clone, Debug)]
url: String, enum CLIAction {
DownloadUrl(String),
ResolveZippyUrl(String),
UrlList(String),
None
}
#[derive(Clone, Debug)]
struct CLIArguments {
outdir: String,
into_file: Option<String>,
parallel_file_count: u32,
conn_count: u32,
zippy: bool,
action: CLIAction,
urls: Vec<String>
} }
type SyncQueue = Arc<Mutex<VecDeque<DlRequest>>>;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> ResBE<()> {
let args = CLIArgs::parse();
// Combine all urls taken from files and the ones provided on the command line let arguments = App::new("FFDL - Fast File Downloader")
let mut urls = args.download.clone(); .version(crate_version!())
for file in args.listfile.iter() { .about("Download files fast")
match urls_from_listfile(file).await { .arg(
Ok(listfile_urls) => urls.extend(listfile_urls), Arg::with_name("outdir")
Err(_) => { .short("o")
eprintln!("Failed to read urls from file: {}", file.display()); .long("outdir")
.value_name("OUTPUT DIR")
.takes_value(true)
.help("Set the output directory. The directory will be created \
if it doesn't exit yet")
)
.arg(
Arg::with_name("into_file")
.short("i")
.long("into-file")
.value_name("FILENAME")
.takes_value(true)
.requires("download")
.help("Force filename. This only works for single file downloads")
)
.arg(
Arg::with_name("file_count")
.short("n")
.long("num-files")
.value_name("NUMBER OF CONCURRENT FILE DOWNLOADS")
.takes_value(true)
.help("Specify the number concurrent file downloads")
)
.arg(
Arg::with_name("conn_count")
.short("c")
.long("connections")
.value_name("NUMBER OF CONNECTIONS")
.takes_value(true)
.help("The number concurrent connections per file download. \
Downloads might fail when the number of connections is \
too high. Files started with multiple connections can't \
be continued. NOTE: This will likely cause IO \
bottlenecks on HDDs")
)
.arg(
Arg::with_name("zippyshare")
.short("z")
.long("zippy")
.takes_value(false)
.help("The provided URLs are zippyshare URLs and need to be resolved")
)
.group(
ArgGroup::with_name("action")
.required(true)
)
.arg(
Arg::with_name("listfile")
.short("l")
.long("listfile")
.value_name("URL LIST")
.takes_value(true)
.group("action")
.help("Download all files form the specified url list")
)
.arg(
Arg::with_name("download")
.short("d")
.long("download")
.value_name("URL")
.takes_value(true)
.group("action")
.help("Download only the specified URL")
)
.arg(
Arg::with_name("zippy-resolve")
.long("zippy-resolve")
.value_name("ZIPPYSHARE URL")
.takes_value(true)
.group("action")
.help("Resolve the zippyshare url to real download url")
)
.get_matches();
let outdir = arguments.value_of("outdir").unwrap_or("./").to_string();
let into_file = arguments.value_of("into_file").map(String::from);
let file_count = arguments.value_of("file_count").unwrap_or("1");
let file_count: u32 = file_count.parse().unwrap_or_else(|_| {
eprintln!("Invalid value for num-files: {}", file_count);
exit(1);
});
if file_count <= 0 {
eprintln!("Invalid value for num-files: {}", file_count);
exit(1); exit(1);
} }
}
let conn_count = arguments.value_of("conn_count").unwrap_or("1");
let conn_count: u32 = conn_count.parse().unwrap_or_else(|_| {
eprintln!("Invalid value for connections: {}", conn_count);
exit(1);
});
if conn_count <= 0 {
eprintln!("Invalid value for connections: {}", conn_count);
exit(1);
} }
if urls.is_empty() { let is_zippy = arguments.is_present("zippyshare");
eprintln!("No URLs provided");
return Ok(());
let action =
if let Some(listfile) = arguments.value_of("listfile") {
CLIAction::UrlList (
listfile.to_string()
)
} else if let Some(download_url) = arguments.value_of("download") {
CLIAction::DownloadUrl(
download_url.to_string()
)
} else if let Some(resolve_url) = arguments.value_of("zippy-resolve") {
CLIAction::ResolveZippyUrl(
resolve_url.to_string()
)
}
else {
CLIAction::None
};
let mut cli_args = CLIArguments {
outdir: outdir,
into_file: into_file,
parallel_file_count: file_count,
conn_count: conn_count,
zippy: is_zippy,
action: action,
urls: Vec::new()
};
// Evaluate and execute the requested action. The 3 different actions are
// mutally exclusive, so only one of them will be executed
match &cli_args.action {
CLIAction::UrlList(listfile) => {
let p_listfile = Path::new(listfile);
if !p_listfile.is_file() {
eprintln!("Listfile '{}' does not exist!", &listfile);
exit(1);
} }
download_multiple(args, urls).await let ifile = std::fs::File::open(p_listfile)?;
}
/// Parse a listfile and return all urls found in it cli_args.urls = std::io::BufReader::new(ifile)
async fn urls_from_listfile(listfile: &Path) -> Result<Vec<String>> {
let text = tokio::fs::read_to_string(listfile).await?;
let urls = text
.lines() .lines()
.map(str::trim) .map(|l| l.unwrap())
.filter(|line| !line.is_empty() && !line.starts_with('#')) .filter(|url| url.len() > 0 && !url.starts_with("#"))
.map(str::to_string)
.collect(); .collect();
Ok(urls) },
CLIAction::DownloadUrl(url) => {
cli_args.urls = vec![url.clone()];
} }
// Download all files in parallel according to the provided CLI arguments CLIAction::ResolveZippyUrl(url) => {
async fn download_multiple(args: CLIArgs, raw_urls: Vec<String>) -> Result<()> { let resolved_url = zippy::resolve_link(url).await.unwrap_or_else(|_| {
let num_urls = raw_urls.len(); println!("Zippyshare link could not be resolved");
let urls: SyncQueue = Default::default(); exit(1);
});
let enumerated_urls = raw_urls println!("{}", resolved_url);
.into_iter() exit(0);
.enumerate() },
.map(|(id, url)| DlRequest { id, url });
urls.lock().await.extend(enumerated_urls);
if !args.outdir.exists() { CLIAction::None => {
if let Err(_e) = create_dir_all(&args.outdir).await { eprintln!("No action selected. This should not happen");
eprintln!( exit(1);
"Error creating output directory '{}'", }
args.outdir.display()
); }
download_multiple(cli_args).await
}
async fn download_multiple(cli_args: CLIArguments) -> ResBE<()> {
let outdir = cli_args.outdir;
let outdir = Path::new(&outdir);
let parallel_file_count = cli_args.parallel_file_count;
let zippy = cli_args.zippy;
let conn_count = cli_args.conn_count;
let urls = Arc::new(Mutex::new(VecDeque::<(u32, String)>::new()));
cli_args.urls.iter().enumerate().for_each(|(i, url)| {
urls.lock().unwrap().push_back((i as u32, url.clone()));
});
if !outdir.exists() {
if let Err(_e) = std::fs::create_dir_all(outdir) {
eprintln!("Error creating output directory '{}'", outdir.to_str().unwrap());
exit(1); exit(1);
} }
} }
let (tx, rx) = unbounded_channel::<DlReport>();
let t_start = SystemTime::now(); let t_start = SystemTime::now();
let jobs = (0..args.file_count.get()) let mut joiners = Vec::new();
.map(|_| tokio::task::spawn(download_job(Arc::clone(&urls), tx.clone(), args.clone())))
.collect::<Vec<_>>(); let (tx, rx) = mpsc::unbounded_channel::<DlReport>();
for _offset in 0 .. parallel_file_count {
let tx = tx.clone();
let outdir = outdir.to_owned();
let arg_filename = cli_args.into_file.clone();
let urls = urls.clone();
joiners.push(tokio::task::spawn(async move {
loop {
let (global_url_index, url) = match urls.lock().unwrap().pop_front() {
Some(it) => it,
None => break
};
let tx = tx.clone();
let rep = DlReporter::new(global_url_index, tx);
let url = if zippy {
match zippy::resolve_link(&url).await {
Ok(url) => url,
Err(_e) => {
rep.send(
DlStatus::Message(format!("Zippyshare link could not be resolved: {}", url))
);
continue;
}
}
} else {
url.to_string()
};
let file_name = arg_filename.clone().unwrap_or_else(|| download::url_to_filename(&url));
let into_file = outdir.join(Path::new(&file_name))
.to_str().unwrap().to_string();
let path_into_file = Path::new(&into_file);
let (filesize, range_supported) = match download::http_get_filesize_and_range_support(&url).await {
Ok((filesize, range_supported)) => (filesize, range_supported),
Err(_e) => {
rep.send(
DlStatus::Message(format!("Error while querying metadata: {}", url))
);
continue;
}
};
// If file with same name is present locally, check filesize
if path_into_file.exists() {
let local_filesize = std::fs::metadata(path_into_file).unwrap().len();
if filesize == local_filesize {
rep.send(DlStatus::Message(format!("Skipping file '{}': already present", &file_name)));
rep.send(DlStatus::Skipped);
continue;
} else {
rep.send(DlStatus::Message(format!("Replacing file '{}': present but not completed", &file_name)));
}
}
if conn_count == 1 {
if let Err(_e) = download::download_feedback(&url, &into_file, rep.clone(), Some(filesize)).await {
rep.send(DlStatus::DoneErr {
filename: file_name.to_string()
});
}
} else {
if !range_supported {
rep.send(
DlStatus::Message(format!("Error Server does not support range header: {}", url))
);
continue;
}
if let Err(_e) = download::download_feedback_multi(&url, &into_file, rep.clone(), conn_count, Some(filesize)).await {
rep.send(DlStatus::DoneErr {
filename: file_name.to_string()
});
}
};
}
}))
}
drop(tx); drop(tx);
cli_print_reports(rx, num_urls as i32).await?; dlreport::watch_and_print_reports(rx, cli_args.urls.len() as i32).await?;
join_all(jobs).await; join_all(joiners).await;
println!("Total time: {}s", t_start.elapsed()?.as_secs()); println!("Total time: {}s", t_start.elapsed()?.as_secs());
Ok(()) Ok(())
} }
async fn download_job(urls: SyncQueue, reporter: UnboundedSender<DlReport>, cli_args: CLIArgs) {
// The mutex access must be in its own scope to ensure that the lock is dropped
while let Some(dlreq) = {
let mut urls = urls.lock().await;
urls.pop_front().take()
} {
let reporter = DlReporter::new(dlreq.id as u32, reporter.clone());
report_msg!(reporter, "Downloading {}", dlreq.url);
// Resolve the zippy url to the direct download url if necessary
let url = match is_integrated_url(&dlreq.url) {
Some(service) => match resolve_integrated_url(&dlreq.url, service).await {
Ok(url) => url,
Err(_e) => {
report_msg!(
reporter,
"Zippyshare link could not be resolved, skipping: {}",
dlreq.url
);
continue;
}
},
None => dlreq.url,
};
let info = match http_file_info(&url).await {
Ok(it) => it,
Err(_e) => {
report_msg!(reporter, "Error while querying metadata: {url}");
continue;
}
};
let into_file = cli_args.outdir.join(Path::new(&info.filename));
// If file with same name is present locally, check filesize
if into_file.exists() {
let local_filesize = std::fs::metadata(&into_file).unwrap().len();
if info.filesize == local_filesize {
report_msg!(
reporter,
"Skipping file '{}': already present",
info.filename
);
reporter.skipped();
continue;
} else {
report_msg!(
reporter,
"Replacing file '{}': present but not completed",
&info.filename
);
}
}
let dl_status = if cli_args.conn_count.get() == 1 {
download_feedback(&url, &into_file, reporter.clone(), info.filesize).await
} else if !info.range_support {
report_msg!(
reporter,
"Server does not support range headers. Downloading with single connection: {url}"
);
download_feedback(&url, &into_file, reporter.clone(), info.filesize).await
} else {
download_feedback_multi(
&url,
&into_file,
reporter.clone(),
cli_args.conn_count.get(),
info.filesize,
)
.await
};
if dl_status.is_err() {
reporter.done_err(info.filename);
}
}
}

View File

@ -1,106 +0,0 @@
pub struct RollingAverage {
index: usize,
data: Vec<f64>,
}
impl RollingAverage {
pub fn new(size: usize) -> Self {
RollingAverage {
index: 0,
data: Vec::with_capacity(size),
}
}
pub fn value(&self) -> f64 {
if self.data.is_empty() {
0.0
} else {
let mut max = self.data[0];
for v in self.data.iter() {
if *v > max {
max = *v;
}
}
let mut sum: f64 = self.data.iter().sum();
let mut count = self.data.len();
if self.data.len() >= 3 {
sum -= max;
count -= 1;
}
sum / count as f64
}
}
pub fn add(&mut self, val: f64) {
if self.data.capacity() == self.data.len() {
self.data[self.index] = val;
self.index += 1;
if self.index >= self.data.capacity() {
self.index = 0;
}
} else {
self.data.push(val);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rolling_average() {
let mut ra = RollingAverage::new(3);
assert_eq!(0, ra.data.len());
assert_eq!(3, ra.data.capacity());
assert_eq!(0.0, ra.value());
// 10 / 1 = 10
ra.add(10.0);
assert_eq!(1, ra.data.len());
assert_eq!(10.0, ra.value());
// (10 + 20) / 2 = 15
ra.add(20.0);
assert_eq!(2, ra.data.len());
assert_eq!(15.0, ra.value());
// (10 + 20 + 30) / 3 = 20
ra.add(30.0);
assert_eq!(3, ra.data.len());
assert_eq!(20.0, ra.value());
assert_eq!(10.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
// This should replace the oldest value (index 1)
ra.add(40.0);
assert_eq!(3, ra.data.len());
assert_eq!(3, ra.data.capacity());
// (40 + 20 + 30) / 3 = 30
assert_eq!(30.0, ra.value());
assert_eq!(40.0, ra.data[0]);
assert_eq!(20.0, ra.data[1]);
assert_eq!(30.0, ra.data[2]);
ra.add(50.0);
ra.add(60.0);
ra.add(70.0);
assert_eq!(70.0, ra.data[0]);
assert_eq!(50.0, ra.data[1]);
assert_eq!(60.0, ra.data[2]);
}
}

55
src/zippy.rs Normal file
View File

@ -0,0 +1,55 @@
use regex::Regex;
use std::io::{ Error, ErrorKind };
use crate::errors::ResBE;
/*
Updated: 07.03.2022
Link generation code:
- `href = $1 + ($2 % $3 + $4 % $5) + $6`
- `$1` is always `/d/XXX` where XXX is dependent on the file
- `$2`, `$3`, `$4` and `$5` are dynamic and randomly generated on each reload
- `$2` is always the same as `$4`
- `$6` is dependent on the file
- The numbers in the calculation part ($2`, `$3`, `$4` and `$5`) are hard coded
```
document.getElementById('dlbutton').href = "/d/0Ky7p1C6/" + (186549 % 51245 + 186549 % 913) + "/some-file-name.part1.rar";
```
*/
pub async fn resolve_link(url: &str) -> ResBE<String> {
// Regex to check if the provided url is a zippyshare download url
let re = Regex::new(r"(https://www\d*\.zippyshare\.com)")?;
if !re.is_match(&url) {
return Err(Error::new(ErrorKind::Other, "URL is not a zippyshare url").into());
}
// Extract the hostname (with https:// prefix) for later
let base_host = &re.captures(&url).unwrap()[0];
// Download the html body for the download page
let body = reqwest::get(url).await?
.text().await?;
// Regex to match the javascript part of the html that generates the real download link
let re_link = Regex::new(r#"document\.getElementById\('dlbutton'\)\.href = "(/d/.+/)" \+ \((\d+) % (\d+) \+ \d+ % (\d+)\) \+ "(.+)";"#)?;
let cap_link = match re_link.captures(&body) {
Some(cap) => cap,
None => return Err(Error::new(ErrorKind::Other, "Link not found").into())
};
let url_start = &cap_link[1];
let url_end = &cap_link[5];
let n2: i32 = i32::from_str_radix(&cap_link[2], 10)?;
let n3: i32 = i32::from_str_radix(&cap_link[3], 10)?;
let n4 = n2;
let n5: i32 = i32::from_str_radix(&cap_link[4], 10)?;
let mixed = n2 % n3 + n4 % n5;
let dl_url = format!("{}{}{}{}", &base_host, url_start, mixed, url_end);
Ok(dl_url)
}