Browse Source

Initial commit

master
blank X 5 months ago
commit
fe4e09ef5a
Signed by: blankie GPG Key ID: CC15FC822C7F61F5
  1. 2
      .gitignore
  2. 1477
      Cargo.lock
  3. 23
      Cargo.toml
  4. 21
      LICENSE
  5. 25
      src/config.rs
  6. 337
      src/main.rs
  7. 98
      src/structs.rs
  8. 289
      src/utils.rs
  9. 725
      src/workers.rs

2
.gitignore

@ -0,0 +1,2 @@
/target
autoytarchivers.session

1477
Cargo.lock

File diff suppressed because it is too large

23
Cargo.toml

@ -0,0 +1,23 @@
[package]
name = "autoytarchivers"
version = "0.1.0"
authors = ["blank X <theblankx@protonmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.release]
lto = true
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = "0.11"
grammers-client = { git = "https://github.com/Lonami/grammers" }
grammers-session = { git = "https://github.com/Lonami/grammers" }
grammers-tl-types = { git = "https://github.com/Lonami/grammers" }
quick-xml = "0.22"
rand = "0.8"
regex = "1.5"
url = { version = "2.2", features = ["serde"] }
tokio = { version = "1.5", features = ["rt-multi-thread", "process", "fs", "sync", "time", "io-util"] }

21
LICENSE

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 blank X
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

25
src/config.rs

@ -0,0 +1,25 @@
use std::time::Duration;
pub const API_ID: i32 = 0;
pub const API_HASH: &str = "https://my.telegram.org";
pub const BOT_TOKEN: &str = "https://telegram.org/BotFather";
// To obtain the packed chat, set to None and send a message to the chat
pub const STORAGE_CHAT: Option<&[u8]> = None;
pub const STORAGE_MESSAGE_ID: i32 = 7;
pub const WAIT_DURATION: Duration = Duration::from_secs(30 * 60);
pub const VIDEO_WORKERS: usize = 2;
pub const UPLOAD_WORKERS: usize = 5;
pub const CHANNEL_IDS: [&str; 2] = [
"UCL_qhgtOy0dy1Agp8vkySQg",
"UCHsx4Hqa-1ORjQTh9TYDhww",
];
pub const INVIDIOUS_INSTANCES: Option<[&str; 6]> = Some([
"https://tube.connect.cafe",
"https://invidious.zapashcanon.fr",
"https://invidious.site",
"https://invidious.048596.xyz",
"https://vid.puffyan.us",
"https://invidious.silkky.cloud",
]);

337
src/main.rs

@ -0,0 +1,337 @@
mod config;
mod structs;
mod utils;
mod workers;
use grammers_client::{types::chat::PackedChat, Client, Config, InputMessage, Update};
use grammers_session::Session;
use grammers_tl_types::functions::Ping;
use rand::{random, thread_rng, Rng};
use regex::Regex;
use reqwest::ClientBuilder;
use std::collections::{HashSet, VecDeque};
use std::env::args;
use std::io::Cursor;
use std::process::exit;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Instant};
extern crate tokio;
async fn async_main() {
let nodl = match args().nth(1) {
Some(i) => {
if i == "nodl" {
true
} else {
eprintln!("Unknown argument: {}", i);
return;
}
}
None => false,
};
let rclient = ClientBuilder::new()
.timeout(Duration::from_secs(60))
.build()
.expect("Failed to build reqwest::Client");
println!("Connecting to Telegram...");
let mut tclient = Client::connect(Config {
session: Session::load_file_or_create("autoytarchivers.session")
.expect("Failed to Session::load_file_or_create"),
api_id: config::API_ID,
api_hash: config::API_HASH.to_string(),
params: Default::default(),
})
.await
.expect("Failed to connect to Telegram");
println!("Connected to Telegram");
if !tclient
.is_authorized()
.await
.expect("Failed to check if client is authorized")
{
println!("Signing in...");
tclient
.bot_sign_in(config::BOT_TOKEN, config::API_ID, config::API_HASH)
.await
.expect("Failed to sign in");
println!("Signed in");
}
{
let tclient = tclient.clone();
tokio::task::spawn(async move {
loop {
let ping_id = random();
if let Err(err) = tclient.invoke(&Ping { ping_id }).await {
eprintln!("Failed to ping Telegram: {:?}", err);
}
sleep(Duration::from_secs(60)).await;
}
});
}
if config::STORAGE_CHAT.is_none() {
while let Some(update) = tclient
.next_update()
.await
.expect("Failed client.next_updates()")
{
if let Update::NewMessage(message) = update {
println!(
"Received a message in {} ({}), packed chat: {:?}",
message.chat().id(),
message.chat().name(),
message.chat().pack().to_bytes()
);
}
}
return;
}
let mut seen_videos: Option<Vec<String>> = None;
let chat = PackedChat::from_bytes(config::STORAGE_CHAT.unwrap())
.expect("Failed to unpack chat")
.unpack();
match tclient
.get_messages_by_id(&chat, &[config::STORAGE_MESSAGE_ID])
.await
{
Ok(mut messages) => {
if let Some(message) = messages.pop().expect("Telegram returned 0 messages") {
if let Some(media) = message.media() {
let mut data = vec![];
let mut download = tclient.iter_download(&media);
loop {
match download.next().await {
Ok(Some(chunk)) => data.extend(chunk),
Ok(None) => break,
Err(err) => {
eprintln!("Failed to iter_download: {:?}", err);
data.clear();
break;
}
};
}
if !data.is_empty() {
match serde_json::from_slice(&data) {
Ok(i) => seen_videos = Some(i),
Err(err) => eprintln!("Failed to parse seen videos json: {:?}", err),
};
}
} else {
eprintln!("Seen videos message has no media");
}
} else {
eprintln!("Seen videos message does not exist");
}
}
Err(err) => eprintln!("Failed to get seen videos message: {:?}", err),
};
let seen_videos = Arc::new(RwLock::new(seen_videos.unwrap_or_default()));
let tmp_handled = Arc::new(Mutex::new(HashSet::new()));
let video_semaphore = Arc::new(Semaphore::new(0));
let video_mutex = Arc::new(Mutex::new(VecDeque::new()));
let upload_semaphore = Arc::new(Semaphore::new(0));
let upload_mutex = Arc::new(Mutex::new(VecDeque::new()));
let query_lock = Arc::new(tokio::sync::Mutex::new(()));
if !nodl {
let date_regex = Arc::new(Regex::new(r#" *\d{4}-\d{2}-\d{2} \d{2}:\d{2}$"#).unwrap());
for _ in 0..config::VIDEO_WORKERS {
tokio::task::spawn(workers::video_worker(
rclient.clone(),
tclient.clone(),
chat.clone(),
date_regex.clone(),
video_semaphore.clone(),
video_mutex.clone(),
upload_semaphore.clone(),
upload_mutex.clone(),
));
}
for _ in 0..config::UPLOAD_WORKERS {
tokio::task::spawn(workers::upload_worker(
tclient.clone(),
chat.clone(),
upload_semaphore.clone(),
upload_mutex.clone(),
seen_videos.clone(),
tmp_handled.clone(),
));
}
}
loop {
for i in &config::CHANNEL_IDS {
println!("Checking channel {}", i);
match utils::get_videos(&rclient, i).await {
Ok(videos) => {
for j in videos {
{
if tmp_handled.lock().unwrap().contains(&j) {
continue;
}
}
if nodl {
let mut seen_videos = seen_videos.write().unwrap();
if !seen_videos.contains(&j) {
seen_videos.push(j);
}
} else {
{
let seen_videos = seen_videos.read().unwrap();
if seen_videos.contains(&j) {
continue;
}
}
let mutex = video_mutex.clone();
let semaphore = video_semaphore.clone();
let query_lock = query_lock.clone();
let tclient = tclient.clone();
let chat = chat.clone();
tokio::task::spawn(async move {
let mut waited = false;
let mut i = 1;
loop {
let guard = query_lock.lock().await;
match utils::get_video(&j).await {
Ok(Some(i)) => {
let first_try_live =
i.is_live.unwrap_or_default() && !waited;
mutex.lock().unwrap().push_back((
i,
Instant::now(),
first_try_live,
));
semaphore.add_permits(1);
break;
}
Ok(None) => break,
Err(err) => {
eprintln!("Failed to get video data: {:?}", err);
waited = true;
if let structs::Error::YoutubeDL(ref err) = err {
if err.output.contains("429")
|| err
.output
.to_lowercase()
.contains("too many request")
{
sleep(Duration::from_secs(i * 60 * 60)).await;
i += 1;
continue;
} else if err.output.starts_with("autoytarchivers:")
{
drop(guard);
let time: u64 = err
.output
.splitn(3, &[':', ' '][..])
.nth(1)
.unwrap()
.parse()
.unwrap();
sleep(Duration::from_secs(time + 30)).await;
continue;
}
}
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
"failed-get-video-data.log".to_string(),
)
.await
{
Ok(uploaded) => {
let message = InputMessage::text(
"Failed to get video data",
)
.mime_type("text/plain")
.file(uploaded);
if let Err(err) =
tclient.send_message(&chat, message).await
{
eprintln!(
"Failed to send message about failing to get video data: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get video data, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to get video data: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to get video data: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get video data, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to get video data: {:?}", err);
}
}
};
}
};
let tmp = thread_rng().gen_range(30..=10 * 60);
sleep(Duration::from_secs(tmp)).await;
}
});
}
}
}
Err(err) => {
eprintln!("Failed to get video list: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-get-video-list.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to get video list")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to get video list: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get video list, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to get video list: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to get video list: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get video list, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to get video list: {:?}", err);
}
}
};
}
};
sleep(Duration::from_secs(thread_rng().gen_range(30..=60))).await;
}
if nodl {
if utils::update_seen_videos(&mut tclient, &chat, seen_videos.read().unwrap().clone())
.await
{
exit(0);
}
return;
}
sleep(config::WAIT_DURATION).await;
}
}
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to build tokio runtime")
.block_on(async_main());
exit(1);
}

98
src/structs.rs

@ -0,0 +1,98 @@
use serde::Deserialize;
use std::io;
use std::process::ExitStatus;
use std::string::FromUtf8Error;
use tokio::task::JoinError;
use url::Url;
extern crate quick_xml;
extern crate reqwest;
extern crate serde_json;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct InvidiousVideo {
pub video_id: String,
}
#[derive(Deserialize, Debug)]
pub struct VideoURL {
pub url: Url,
}
#[derive(Deserialize, Debug)]
pub struct VideoData {
#[serde(default)]
pub requested_formats: Vec<VideoURL>,
#[serde(default)]
pub url: Option<Url>,
pub duration: u64,
pub id: String,
pub title: String,
#[serde(default)]
pub thumbnail: Option<Url>,
#[serde(default)]
pub is_live: Option<bool>,
#[serde(skip)]
pub json: String,
}
#[derive(Debug)]
pub struct YoutubeDLError {
pub status: ExitStatus,
pub output: String,
}
#[derive(Debug)]
pub enum Error {
IO(io::Error),
JoinError(JoinError),
Reqwest(reqwest::Error),
QuickXML(quick_xml::Error),
SerdeJSON(serde_json::Error),
FromUtf8Error(FromUtf8Error),
YoutubeDL(YoutubeDLError),
}
impl From<io::Error> for Error {
#[inline]
fn from(error: io::Error) -> Error {
Error::IO(error)
}
}
impl From<JoinError> for Error {
#[inline]
fn from(error: JoinError) -> Error {
Error::JoinError(error)
}
}
impl From<reqwest::Error> for Error {
#[inline]
fn from(error: reqwest::Error) -> Error {
Error::Reqwest(error)
}
}
impl From<quick_xml::Error> for Error {
#[inline]
fn from(error: quick_xml::Error) -> Error {
Error::QuickXML(error)
}
}
impl From<serde_json::Error> for Error {
#[inline]
fn from(error: serde_json::Error) -> Error {
Error::SerdeJSON(error)
}
}
impl From<FromUtf8Error> for Error {
#[inline]
fn from(error: FromUtf8Error) -> Error {
Error::FromUtf8Error(error)
}
}

289
src/utils.rs

@ -0,0 +1,289 @@
use crate::config::{INVIDIOUS_INSTANCES, STORAGE_MESSAGE_ID};
use crate::structs::{Error, InvidiousVideo, Result, VideoData, YoutubeDLError};
use grammers_client::{types::Chat, InputMessage};
use quick_xml::{events::Event, Reader};
use rand::{thread_rng, Rng};
use reqwest::Client;
use std::io::Cursor;
use std::process::Stdio;
use tokio::io::{copy_buf, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::time::{sleep, Duration};
extern crate grammers_client;
extern crate serde_json;
extern crate tokio;
const PYTHON_INPUT: &[u8] = br#"import sys
import json
try:
import yt_dlp as youtube_dl
except ImportError:
import youtube_dl
_try_get = youtube_dl.extractor.youtube.try_get
def traverse_dict(src):
for (key, value) in src.items():
if key == 'scheduledStartTime':
return value
if isinstance(value, dict):
if value := traverse_dict(value):
return value
return None
def try_get(src, getter, expected_type=None):
if isinstance(src, dict):
if reason := src.get('reason'):
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
if t := _try_get(src, traverse_dict, str):
src['reason'] = f'autoytarchivers:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube_dl.extractor.youtube.try_get = try_get
ytdl = youtube_dl.YoutubeDL({"skip_download": True, "no_color": True, "quiet": True})
try:
print(json.dumps(ytdl.extract_info("https://www.youtube.com/watch?v=" + sys.argv[1]), indent=4), file=sys.stderr)
except Exception as e:
sys.exit(str(e))"#;
pub async fn get_videos(client: &Client, channel_id: &str) -> Result<Vec<String>> {
let mut video_ids = vec![];
if let Some(invidious_instances) = INVIDIOUS_INSTANCES {
for i in &invidious_instances {
let resp = match client
.get(&format!("{}/api/v1/channels/{}/latest", i, channel_id))
.query(&[("fields", "videoId")])
.header("Cache-Control", "no-store, max-age=0")
.send()
.await
{
Ok(i) => i,
Err(err) => {
eprintln!("Failed to connect to {}: {:?}", i, err);
continue;
}
};
if resp.status() != 200 {
eprintln!("Got {} from {}", resp.status(), i);
continue;
}
let resp = match resp.bytes().await {
Ok(i) => i,
Err(err) => {
eprintln!("Failed to get data from {}: {:?}", i, err);
continue;
}
};
let resp: Vec<InvidiousVideo> = match serde_json::from_slice(&resp) {
Ok(i) => i,
Err(err) => {
eprintln!("Failed to parse data from {}: {:?}", i, err);
continue;
}
};
video_ids.extend(resp.into_iter().take(15).map(|i| i.video_id));
if !video_ids.is_empty() {
return Ok(video_ids);
}
}
}
let resp = client
.get("https://www.youtube.com/feeds/videos.xml")
.query(&[("channel_id", channel_id)])
.header("Cache-Control", "no-store, max-age=0")
.send()
.await?
.error_for_status()?
.text()
.await?;
let mut reader = Reader::from_str(&resp);
let mut buf = vec![];
let mut inside = false;
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref e)) if e.name() == b"yt:videoId" => inside = true,
Ok(Event::Text(e)) if inside => {
video_ids.push(e.unescape_and_decode(&reader)?);
if video_ids.len() >= 15 {
break;
}
inside = false;
}
Ok(Event::Eof) => break,
Err(err) => Err(err)?,
_ => (),
};
buf.clear();
}
Ok(video_ids)
}
pub async fn get_video(video_id: &str) -> Result<Option<VideoData>> {
let mut command = Command::new("python3");
let mut process = command
.args(&["-", video_id])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let mut stdin = process.stdin.take().unwrap();
tokio::spawn(async move {
if let Err(err) = stdin.write_all(PYTHON_INPUT).await {
eprintln!("Failed to write PYTHON_INPUT: {:?}", err);
}
drop(stdin)
});
let stderr = process.stderr.take().unwrap();
let task: tokio::task::JoinHandle<std::result::Result<_, std::io::Error>> =
tokio::spawn(async move {
let mut stderr = BufReader::new(stderr);
let mut writer = vec![];
copy_buf(&mut stderr, &mut writer).await?;
Ok(writer)
});
let status = process.wait().await?;
let stderr = String::from_utf8(task.await??)?;
if status.success() {
let mut data: VideoData = serde_json::from_str(&stderr)?;
data.json = stderr;
Ok(Some(data))
} else {
let stderr_lowercase = stderr.to_lowercase();
if stderr_lowercase.contains("private video")
|| stderr_lowercase.contains("unavailable")
|| stderr_lowercase.contains("not available")
{
Ok(None)
} else {
Err(Error::YoutubeDL(YoutubeDLError {
status,
output: stderr,
}))
}
}
}
pub async fn get_video_retry(
tclient: &mut grammers_client::Client,
chat: &Chat,
video_data: VideoData,
) -> Option<VideoData> {
for i in 1..=5 {
match get_video(&video_data.id).await {
Ok(i) => return i,
Err(err) => {
eprintln!("Failed to get video data: {:?}", err);
if let Error::YoutubeDL(ref err) = err {
if err.output.contains("429")
|| err.output.to_lowercase().contains("too many requests")
{
sleep(Duration::from_secs(i * 60 * 60)).await;
continue;
}
}
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-get-video-data.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to get video data")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to get video data: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get video data, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to get video data: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to get video data: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get video data, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to get video data: {:?}", err);
}
}
};
}
};
let tmp = thread_rng().gen_range(30..=10 * 60);
sleep(Duration::from_secs(tmp)).await;
}
Some(video_data)
}
pub fn is_manifest(video_data: &VideoData) -> bool {
if video_data.requested_formats.is_empty() {
video_data.url.as_ref().unwrap().domain() == Some("manifest.googlevideo.com")
} else {
video_data
.requested_formats
.iter()
.any(|i| i.url.domain() == Some("manifest.googlevideo.com"))
}
}
pub fn extension(text: &str) -> Option<&str> {
text.trim_start_matches(".").splitn(2, ".").nth(1)
}
pub async fn update_seen_videos(
tclient: &mut grammers_client::Client,
chat: &Chat,
seen_videos: Vec<String>,
) -> bool {
let bytes = serde_json::to_vec(&serde_json::json!(seen_videos)).unwrap();
let size = bytes.len();
let mut stream = Cursor::new(bytes);
match tclient
.upload_stream(&mut stream, size, "autoytarchivers.json".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("")
.mime_type("application/json")
.file(uploaded);
if let Err(err) = tclient
.edit_message(&chat, STORAGE_MESSAGE_ID, message)
.await
{
eprintln!("Failed to edit seen videos: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to edit seen videos, see logs"),
)
.await
{
eprintln!(
"Failed to send message about failing to edit seen videos: {:?}",
err
);
}
false
} else {
true
}
}
Err(err) => {
eprintln!("Failed to upload seen videos: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to upload seen videos, see logs"),
)
.await
{
eprintln!(
"Failed to send message about failing to upload seen videos: {:?}",
err
);
}
false
}
}
}

725
src/workers.rs

@ -0,0 +1,725 @@
use crate::structs::VideoData;
use crate::utils;
use grammers_client::types::input_message::InputMessage;
use grammers_client::types::Chat;
use regex::Regex;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::fs::{remove_file, File, OpenOptions};
use std::io::{Cursor, Seek, SeekFrom};
use std::process::Stdio;
use std::sync::{Arc, Mutex, RwLock};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::Semaphore;
use tokio::task;
use tokio::time::{sleep, Duration, Instant};
extern crate grammers_client;
extern crate reqwest;
pub async fn video_worker(
rclient: reqwest::Client,
mut tclient: grammers_client::Client,
chat: Chat,
date_regex: Arc<Regex>,
semaphore: Arc<Semaphore>,
mutex: Arc<Mutex<VecDeque<(VideoData, Instant, bool)>>>,
upload_semaphore: Arc<Semaphore>,
upload_mutex: Arc<Mutex<VecDeque<String>>>,
) {
loop {
semaphore.acquire().await.unwrap().forget();
let (mut video_data, start_time, first_try_live) =
mutex.lock().unwrap().pop_front().unwrap();
let late_to_queue =
first_try_live || Instant::now().duration_since(start_time).as_secs() > 5;
if late_to_queue {
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => continue,
};
}
if utils::is_manifest(&video_data) {
sleep(Duration::from_secs(video_data.duration + 30)).await;
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => continue,
};
}
let video_filename = format!("{}.mkv", &video_data.id);
let file = match OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(format!("{}.log", &video_data.id))
{
Ok(i) => i,
Err(err) => {
eprintln!("Failed to open video log file: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-open-video-log.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to open video log file")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to open video log file: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to open video log file, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to open video log file: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to open video log filr: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to open video log file, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to open video log file: {:?}", err);
}
}
};
continue;
}
};
let mut child = match start_ffmpeg(
&video_data,
&video_filename,
file.try_clone().expect("Failed to clone file"),
&mut tclient,
&chat,
)
.await
{
Some(i) => i,
None => continue,
};
let mut text = "New video".to_string();
if late_to_queue {
text.push_str(" (is late)");
}
text.push_str(": ");
let title = date_regex.replace(&video_data.title, "");
text.push_str(&format!(
"{}\nhttps://www.youtube.com/watch?v={}",
title, &video_data.id
));
let mut stream = Cursor::new(video_data.json.as_bytes());
match tclient
.upload_stream(
&mut stream,
video_data.json.len(),
format!("{}.json", &video_data.id),
)
.await
{
Ok(uploaded) => {
let message = InputMessage::text(&text)
.mime_type("application/json")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send message of video json: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to send message of video json, see logs"),
)
.await
{
eprintln!("Failed to send message about failing to send message of video json: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload video json: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to upload video json, see logs"),
)
.await
{
eprintln!(
"Failed to send message about failing to upload video json: {:?}",
err
);
}
}
};
if let Some(ref thumbnail) = video_data.thumbnail {
match rclient.get(thumbnail.as_str()).send().await {
Ok(resp) => {
let mut filename = video_data.id.clone();
if let Some(path) = resp.url().path_segments() {
if let Some(name) = path.last() {
if let Some(extension) = utils::extension(name) {
filename.push('.');
filename.push_str(extension);
}
}
}
match resp.bytes().await {
Ok(bytes) => {
let size = bytes.len();
let mut stream = Cursor::new(bytes);
match tclient.upload_stream(&mut stream, size, filename).await {
Ok(uploaded) => {
let message = InputMessage::text(&text).file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send thumbnail: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to send thumbnail, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to send thumbnail: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload thumbnail: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload thumbnail, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload thumbnail: {:?}", err);
}
}
};
}
Err(err) => {
eprintln!("Failed to get thumbnail bytes: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
"failed-get-thumbnail-bytes.log".to_string(),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text("Failed to get thumbnail bytes")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to get thumbnail bytes: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get thumbnail bytes, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to get thumbnail bytes: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload logs about failing to get thumbnail bytes: {:?}", err);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get thumbnail bytes, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to get thumbnail bytes: {:?}", err);
}
}
};
}
};
}
Err(err) => {
eprintln!("Failed to connect to thumbnail server: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
"failed-connect-thumbnail-server.log".to_string(),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text("Failed to connect to thumbnail server")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to connect to thumbnail server: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to connect to thumbnail server, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to connect to thumbnail server: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload logs about failing to connect to thumbnail server: {:?}", err);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to connect to thumbnail server, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to connect to thumbnail server: {:?}", err);
}
}
};
}
};
}
for _ in 0..50u8 {
let task = task::spawn(auto_kill(
child.stdin.take().unwrap(),
file.try_clone().expect("Failed to clone file"),
));
let is_ok = match child.wait().await {
Ok(i) => i.success(),
Err(err) => {
eprintln!("Failed to wait for ffmpeg: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-wait-ffmpeg.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to wait for ffmpeg")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to wait for ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to wait for ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to wait for ffmpeg: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to wait for ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to wait for ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to wait for ffmpeg: {:?}", err);
}
}
};
false
}
};
task.abort();
match task.await {
Ok(()) => (),
Err(err) => {
if !err.is_cancelled() {
eprintln!("auto_kill panicked: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "auto-kill-panic.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("auto_kill panicked")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about auto_kill panicking: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about auto_kill panicking, see logs")).await {
eprintln!("Failed to send message about failing to send message about auto_kill panicking: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about auto_kill panicking: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about auto_kill panicking, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about auto_kill panicking: {:?}", err);
}
}
};
}
}
};
if is_ok {
upload_mutex.lock().unwrap().push_back(video_data.id);
upload_semaphore.add_permits(1);
break;
}
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(&format!(
"Failed to download video {}, see logs",
&video_data.id
)),
)
.await
{
eprintln!(
"Failed to send message about failing to download video {}: {:?}",
&video_data.id, err
);
}
if let Err(err) = remove_file(&video_filename) {
eprintln!("Failed to delete {}: {:?}", &video_filename, err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
format!("failed-delete-{}.log", &video_filename),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text(format!("Failed to delete {}", &video_filename))
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text(format!("Failed to send message about failing to delete {}, see logs", &video_filename))).await {
eprintln!("Failed to send message about failing to send message about failing to delete {}: {:?}", &video_filename, err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(format!(
"Failed to upload logs about failing to delete {}, see logs",
&video_filename
)),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to delete {}: {:?}", &video_filename, err);
}
}
};
}
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => break,
};
if utils::is_manifest(&video_data) {
sleep(Duration::from_secs(video_data.duration + 30)).await;
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => break,
};
}
child = match start_ffmpeg(
&video_data,
&video_filename,
file.try_clone().expect("Failed to clone file"),
&mut tclient,
&chat,
)
.await
{
Some(i) => i,
None => break,
};
}
}
}
async fn start_ffmpeg(
video_data: &VideoData,
filename: &str,
file: File,
tclient: &mut grammers_client::Client,
chat: &Chat,
) -> Option<Child> {
let mut command = Command::new("ffmpeg");
let mut command = command
.stdin(Stdio::piped())
.stdout(file.try_clone().expect("Failed to clone file"))
.stderr(file)
.arg("-y");
if video_data.requested_formats.is_empty() {
command = command
.arg("-i")
.arg(video_data.url.as_ref().unwrap().as_str());
} else {
for i in &video_data.requested_formats {
command = command.arg("-i").arg(i.url.as_str());
}
}
match command.args(&["-c", "copy", filename]).spawn() {
Ok(i) => Some(i),
Err(err) => {
eprintln!("Failed to spawn ffmpeg: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-spawn-ffmpeg.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to spawn ffmpeg")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to spawn ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to spawn ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to spawn ffmpeg: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to spawn ffmpeg: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to spawn ffmpeg, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to spawn ffmpeg: {:?}", err);
}
}
};
None
}
}
}
async fn auto_kill(mut stdin: ChildStdin, mut file: File) {
let mut last_tell = file
.stream_position()
.expect("Failed to get stream position");
loop {
sleep(Duration::from_secs(5 * 30)).await;
let current_tell = file
.stream_position()
.expect("Failed to get stream position");
if current_tell == last_tell {
break;
}
last_tell = current_tell;
}
stdin
.write_all(b"q")
.await
.expect("Failed to write to ffmpeg stdin");
drop(stdin);
}
pub async fn upload_worker(
mut tclient: grammers_client::Client,
chat: Chat,
semaphore: Arc<Semaphore>,
mutex: Arc<Mutex<VecDeque<String>>>,
seen_videos: Arc<RwLock<Vec<String>>>,
tmp_handled: Arc<Mutex<HashSet<String>>>,
) {
loop {
semaphore.acquire().await.unwrap().forget();
let video_id = mutex.lock().unwrap().pop_front().unwrap();
let video_filename = format!("{}.mkv", &video_id);
let mut success = true;
let mut file = match tokio::fs::File::open(&video_filename).await {
Ok(i) => i,
Err(err) => {
eprintln!("Failed to open video: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-open-video.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to open video")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to open video: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to open video, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to open video: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to open video: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to open video, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to open video: {:?}", err);
}
}
};
continue;
}
};
let total_size = file.seek(SeekFrom::End(0)).await.unwrap();
file.seek(SeekFrom::Start(0)).await.unwrap();
let parts = (total_size as f64 / (2000.0 * 1024.0 * 1024.0)).ceil() as usize;
let current_position = 0;
for i in 0..parts {
let filename = if parts == 1 {
video_filename.clone()
} else {
format!("{}.part{:02}", &video_filename, i)
};
let mut size = total_size - current_position;
if size > 2000 * 1024 * 1024 {
size = 2000 * 1024 * 1024;
}
match tclient
.upload_stream(&mut file, usize::try_from(size).unwrap(), filename.clone())
.await
{
Ok(uploaded) => {
let message = InputMessage::text(&filename)
.mime_type("video/x-matroska")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send video: {:?}", err);
if let Err(err) = tclient
.send_message(</