338 lines
16 KiB
Rust
338 lines
16 KiB
Rust
mod config;
|
|
mod structs;
|
|
mod utils;
|
|
mod workers;
|
|
use grammers_client::{types::chat::PackedChat, Client, Config, InputMessage, Update};
|
|
use grammers_session::Session;
|
|
use grammers_tl_types::functions::Ping;
|
|
use rand::{random, thread_rng, Rng};
|
|
use regex::Regex;
|
|
use reqwest::ClientBuilder;
|
|
use std::collections::{HashSet, VecDeque};
|
|
use std::env::args;
|
|
use std::io::Cursor;
|
|
use std::process::exit;
|
|
use std::sync::{Arc, Mutex, RwLock};
|
|
use std::time::Duration;
|
|
use tokio::sync::Semaphore;
|
|
use tokio::time::{sleep, Instant};
|
|
extern crate tokio;
|
|
|
|
async fn async_main() {
|
|
let nodl = match args().nth(1) {
|
|
Some(i) => {
|
|
if i == "nodl" {
|
|
true
|
|
} else {
|
|
eprintln!("Unknown argument: {}", i);
|
|
return;
|
|
}
|
|
}
|
|
None => false,
|
|
};
|
|
let rclient = ClientBuilder::new()
|
|
.timeout(Duration::from_secs(60))
|
|
.build()
|
|
.expect("Failed to build reqwest::Client");
|
|
println!("Connecting to Telegram...");
|
|
let mut tclient = Client::connect(Config {
|
|
session: Session::load_file_or_create("autoytarchivers.session")
|
|
.expect("Failed to Session::load_file_or_create"),
|
|
api_id: config::API_ID,
|
|
api_hash: config::API_HASH.to_string(),
|
|
params: Default::default(),
|
|
})
|
|
.await
|
|
.expect("Failed to connect to Telegram");
|
|
println!("Connected to Telegram");
|
|
if !tclient
|
|
.is_authorized()
|
|
.await
|
|
.expect("Failed to check if client is authorized")
|
|
{
|
|
println!("Signing in...");
|
|
tclient
|
|
.bot_sign_in(config::BOT_TOKEN, config::API_ID, config::API_HASH)
|
|
.await
|
|
.expect("Failed to sign in");
|
|
println!("Signed in");
|
|
}
|
|
{
|
|
let tclient = tclient.clone();
|
|
tokio::task::spawn(async move {
|
|
loop {
|
|
let ping_id = random();
|
|
if let Err(err) = tclient.invoke(&Ping { ping_id }).await {
|
|
eprintln!("Failed to ping Telegram: {:?}", err);
|
|
}
|
|
sleep(Duration::from_secs(60)).await;
|
|
}
|
|
});
|
|
}
|
|
if config::STORAGE_CHAT.is_none() {
|
|
while let Some(update) = tclient
|
|
.next_update()
|
|
.await
|
|
.expect("Failed client.next_updates()")
|
|
{
|
|
if let Update::NewMessage(message) = update {
|
|
println!(
|
|
"Received a message in {} ({}), packed chat: {:?}",
|
|
message.chat().id(),
|
|
message.chat().name(),
|
|
message.chat().pack().to_bytes()
|
|
);
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
let mut seen_videos: Option<Vec<String>> = None;
|
|
let chat = PackedChat::from_bytes(config::STORAGE_CHAT.unwrap())
|
|
.expect("Failed to unpack chat")
|
|
.unpack();
|
|
match tclient
|
|
.get_messages_by_id(&chat, &[config::STORAGE_MESSAGE_ID])
|
|
.await
|
|
{
|
|
Ok(mut messages) => {
|
|
if let Some(message) = messages.pop().expect("Telegram returned 0 messages") {
|
|
if let Some(media) = message.media() {
|
|
let mut data = vec![];
|
|
let mut download = tclient.iter_download(&media);
|
|
loop {
|
|
match download.next().await {
|
|
Ok(Some(chunk)) => data.extend(chunk),
|
|
Ok(None) => break,
|
|
Err(err) => {
|
|
eprintln!("Failed to iter_download: {:?}", err);
|
|
data.clear();
|
|
break;
|
|
}
|
|
};
|
|
}
|
|
if !data.is_empty() {
|
|
match serde_json::from_slice(&data) {
|
|
Ok(i) => seen_videos = Some(i),
|
|
Err(err) => eprintln!("Failed to parse seen videos json: {:?}", err),
|
|
};
|
|
}
|
|
} else {
|
|
eprintln!("Seen videos message has no media");
|
|
}
|
|
} else {
|
|
eprintln!("Seen videos message does not exist");
|
|
}
|
|
}
|
|
Err(err) => eprintln!("Failed to get seen videos message: {:?}", err),
|
|
};
|
|
let seen_videos = Arc::new(RwLock::new(seen_videos.unwrap_or_default()));
|
|
let tmp_handled = Arc::new(Mutex::new(HashSet::new()));
|
|
let video_semaphore = Arc::new(Semaphore::new(0));
|
|
let video_mutex = Arc::new(Mutex::new(VecDeque::new()));
|
|
let upload_semaphore = Arc::new(Semaphore::new(0));
|
|
let upload_mutex = Arc::new(Mutex::new(VecDeque::new()));
|
|
let query_lock = Arc::new(tokio::sync::Mutex::new(()));
|
|
if !nodl {
|
|
let date_regex = Arc::new(Regex::new(r#" *\d{4}-\d{2}-\d{2} \d{2}:\d{2}$"#).unwrap());
|
|
for _ in 0..config::VIDEO_WORKERS {
|
|
tokio::task::spawn(workers::video_worker(
|
|
rclient.clone(),
|
|
tclient.clone(),
|
|
chat.clone(),
|
|
date_regex.clone(),
|
|
video_semaphore.clone(),
|
|
video_mutex.clone(),
|
|
upload_semaphore.clone(),
|
|
upload_mutex.clone(),
|
|
));
|
|
}
|
|
for _ in 0..config::UPLOAD_WORKERS {
|
|
tokio::task::spawn(workers::upload_worker(
|
|
tclient.clone(),
|
|
chat.clone(),
|
|
upload_semaphore.clone(),
|
|
upload_mutex.clone(),
|
|
seen_videos.clone(),
|
|
tmp_handled.clone(),
|
|
));
|
|
}
|
|
}
|
|
loop {
|
|
for i in &config::CHANNEL_IDS {
|
|
println!("Checking channel {}", i);
|
|
match utils::get_videos(&rclient, i).await {
|
|
Ok(videos) => {
|
|
for j in videos {
|
|
if nodl {
|
|
let mut seen_videos = seen_videos.write().unwrap();
|
|
if !seen_videos.contains(&j) {
|
|
seen_videos.push(j);
|
|
}
|
|
} else {
|
|
{
|
|
if tmp_handled.lock().unwrap().contains(&j) {
|
|
continue;
|
|
}
|
|
let seen_videos = seen_videos.read().unwrap();
|
|
if seen_videos.contains(&j) {
|
|
continue;
|
|
}
|
|
}
|
|
let mutex = video_mutex.clone();
|
|
let semaphore = video_semaphore.clone();
|
|
let query_lock = query_lock.clone();
|
|
let tclient = tclient.clone();
|
|
let chat = chat.clone();
|
|
tokio::task::spawn(async move {
|
|
let mut waited = false;
|
|
let mut i = 1;
|
|
loop {
|
|
let guard = query_lock.lock().await;
|
|
match utils::get_video(&j).await {
|
|
Ok(Some(i)) => {
|
|
let first_try_live =
|
|
i.is_live.unwrap_or_default() && !waited;
|
|
mutex.lock().unwrap().push_back((
|
|
i,
|
|
Instant::now(),
|
|
first_try_live,
|
|
));
|
|
semaphore.add_permits(1);
|
|
break;
|
|
}
|
|
Ok(None) => break,
|
|
Err(err) => {
|
|
eprintln!("Failed to get video data: {:?}", err);
|
|
waited = true;
|
|
if let structs::Error::YoutubeDL(ref err) = err {
|
|
if err.output.contains("429")
|
|
|| err
|
|
.output
|
|
.to_lowercase()
|
|
.contains("too many request")
|
|
{
|
|
sleep(Duration::from_secs(i * 60 * 60)).await;
|
|
i += 1;
|
|
continue;
|
|
} else if err
|
|
.output
|
|
.starts_with("ERROR: autoytarchivers:")
|
|
{
|
|
drop(guard);
|
|
let time: u64 = err
|
|
.output
|
|
.splitn(5, &[':', ' '][..])
|
|
.nth(3)
|
|
.unwrap()
|
|
.parse()
|
|
.unwrap();
|
|
sleep(Duration::from_secs(time + 30)).await;
|
|
continue;
|
|
}
|
|
}
|
|
let text = format!("{:#?}", err);
|
|
let size = text.len();
|
|
let mut stream = Cursor::new(text.into_bytes());
|
|
match tclient
|
|
.upload_stream(
|
|
&mut stream,
|
|
size,
|
|
"failed-get-video-data.log".to_string(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(uploaded) => {
|
|
let message = InputMessage::text(
|
|
"Failed to get video data",
|
|
)
|
|
.mime_type("text/plain")
|
|
.file(uploaded);
|
|
if let Err(err) =
|
|
tclient.send_message(&chat, message).await
|
|
{
|
|
eprintln!(
|
|
"Failed to send message about failing to get video data: {:?}",
|
|
err
|
|
);
|
|
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get video data, see logs")).await {
|
|
eprintln!("Failed to send message about failing to send message about failing to get video data: {:?}", err);
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
eprintln!(
|
|
"Failed to upload logs about failing to get video data: {:?}",
|
|
err
|
|
);
|
|
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get video data, see logs")).await {
|
|
eprintln!("Failed to send message about failing to upload logs about failing to get video data: {:?}", err);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
};
|
|
let tmp = thread_rng().gen_range(30..=10 * 60);
|
|
sleep(Duration::from_secs(tmp)).await;
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
eprintln!("Failed to get video list: {:?}", err);
|
|
let text = format!("{:#?}", err);
|
|
let size = text.len();
|
|
let mut stream = Cursor::new(text.into_bytes());
|
|
match tclient
|
|
.upload_stream(&mut stream, size, "failed-get-video-list.log".to_string())
|
|
.await
|
|
{
|
|
Ok(uploaded) => {
|
|
let message = InputMessage::text("Failed to get video list")
|
|
.mime_type("text/plain")
|
|
.file(uploaded);
|
|
if let Err(err) = tclient.send_message(&chat, message).await {
|
|
eprintln!(
|
|
"Failed to send message about failing to get video list: {:?}",
|
|
err
|
|
);
|
|
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get video list, see logs")).await {
|
|
eprintln!("Failed to send message about failing to send message about failing to get video list: {:?}", err);
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
eprintln!(
|
|
"Failed to upload logs about failing to get video list: {:?}",
|
|
err
|
|
);
|
|
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get video list, see logs")).await {
|
|
eprintln!("Failed to send message about failing to upload logs about failing to get video list: {:?}", err);
|
|
}
|
|
}
|
|
};
|
|
}
|
|
};
|
|
sleep(Duration::from_secs(thread_rng().gen_range(30..=60))).await;
|
|
}
|
|
if nodl {
|
|
if utils::update_seen_videos(&mut tclient, &chat, seen_videos.read().unwrap().clone())
|
|
.await
|
|
{
|
|
exit(0);
|
|
}
|
|
return;
|
|
}
|
|
sleep(config::WAIT_DURATION).await;
|
|
}
|
|
}
|
|
|
|
fn main() {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to build tokio runtime")
|
|
.block_on(async_main());
|
|
exit(1);
|
|
}
|