autoytarchivers/src/workers.rs

829 lines
38 KiB
Rust

use crate::structs::VideoData;
use crate::utils;
use grammers_client::types::input_message::InputMessage;
use grammers_client::types::Chat;
use regex::Regex;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::fs::{remove_file, File, OpenOptions};
use std::io::{Cursor, Seek, SeekFrom};
use std::process::Stdio;
use std::sync::{Arc, Mutex, RwLock};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::Semaphore;
use tokio::task;
use tokio::time::{sleep, Duration, Instant};
extern crate grammers_client;
extern crate reqwest;
pub async fn video_worker(
rclient: reqwest::Client,
mut tclient: grammers_client::Client,
chat: Chat,
date_regex: Arc<Regex>,
semaphore: Arc<Semaphore>,
mutex: Arc<Mutex<VecDeque<(VideoData, Instant, bool)>>>,
upload_semaphore: Arc<Semaphore>,
upload_mutex: Arc<Mutex<VecDeque<String>>>,
) {
loop {
semaphore.acquire().await.unwrap().forget();
let (mut video_data, start_time, first_try_live) =
mutex.lock().unwrap().pop_front().unwrap();
let late_to_queue =
first_try_live || Instant::now().duration_since(start_time).as_secs() > 5;
if late_to_queue {
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => continue,
};
}
if utils::is_manifest(&video_data) {
sleep(Duration::from_secs(video_data.duration + 30)).await;
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => continue,
};
}
let video_filename = format!("{}.mkv", &video_data.id);
let file = match OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(format!("{}.log", &video_data.id))
{
Ok(i) => i,
Err(err) => {
eprintln!("Failed to open video log file: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-open-video-log.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to open video log file")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to open video log file: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to open video log file, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to open video log file: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to open video log filr: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to open video log file, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to open video log file: {:?}", err);
}
}
};
continue;
}
};
let mut child = match start_ffmpeg(
&video_data,
&video_filename,
file.try_clone().expect("Failed to clone file"),
&mut tclient,
&chat,
)
.await
{
Some(i) => i,
None => continue,
};
let mut text = "New video".to_string();
if late_to_queue {
text.push_str(" (is late)");
}
text.push_str(": ");
let title = date_regex.replace(&video_data.title, "");
text.push_str(&format!(
"{}\nhttps://www.youtube.com/watch?v={}",
title, &video_data.id
));
let mut stream = Cursor::new(video_data.json.as_bytes());
match tclient
.upload_stream(
&mut stream,
video_data.json.len(),
format!("{}.json", &video_data.id),
)
.await
{
Ok(uploaded) => {
let message = InputMessage::text(&text)
.mime_type("application/json")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send message of video json: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to send message of video json, see logs"),
)
.await
{
eprintln!("Failed to send message about failing to send message of video json: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload video json: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text("Failed to upload video json, see logs"),
)
.await
{
eprintln!(
"Failed to send message about failing to upload video json: {:?}",
err
);
}
}
};
if let Some(ref thumbnail) = video_data.thumbnail {
match rclient.get(thumbnail.as_str()).send().await {
Ok(resp) => {
let mut filename = video_data.id.clone();
if let Some(path) = resp.url().path_segments() {
if let Some(name) = path.last() {
if let Some(extension) = utils::extension(name) {
filename.push('.');
filename.push_str(extension);
}
}
}
match resp.bytes().await {
Ok(bytes) => {
let size = bytes.len();
let mut stream = Cursor::new(bytes);
match tclient.upload_stream(&mut stream, size, filename).await {
Ok(uploaded) => {
let message = InputMessage::text(&text).file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send thumbnail: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to send thumbnail, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to send thumbnail: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload thumbnail: {:?}", err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload thumbnail, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload thumbnail: {:?}", err);
}
}
};
}
Err(err) => {
eprintln!("Failed to get thumbnail bytes: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
"failed-get-thumbnail-bytes.log".to_string(),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text("Failed to get thumbnail bytes")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to get thumbnail bytes: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to get thumbnail bytes, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to get thumbnail bytes: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload logs about failing to get thumbnail bytes: {:?}", err);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to get thumbnail bytes, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to get thumbnail bytes: {:?}", err);
}
}
};
}
};
}
Err(err) => {
eprintln!("Failed to connect to thumbnail server: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
"failed-connect-thumbnail-server.log".to_string(),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text("Failed to connect to thumbnail server")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to connect to thumbnail server: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to connect to thumbnail server, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to connect to thumbnail server: {:?}", err);
}
}
}
Err(err) => {
eprintln!("Failed to upload logs about failing to connect to thumbnail server: {:?}", err);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to connect to thumbnail server, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to connect to thumbnail server: {:?}", err);
}
}
};
}
};
}
for _ in 0..50u8 {
let task = task::spawn(auto_kill(
child.stdin.take().unwrap(),
file.try_clone().expect("Failed to clone file"),
));
let is_ok = match child.wait().await {
Ok(i) => i.success(),
Err(err) => {
eprintln!("Failed to wait for ffmpeg: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-wait-ffmpeg.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to wait for ffmpeg")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to wait for ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to wait for ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to wait for ffmpeg: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to wait for ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about failing to wait for ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about failing to wait for ffmpeg: {:?}", err);
}
}
};
false
}
};
task.abort();
match task.await {
Ok(()) => (),
Err(err) => {
if !err.is_cancelled() {
eprintln!("auto_kill panicked: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "auto-kill-panic.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("auto_kill panicked")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about auto_kill panicking: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about auto_kill panicking, see logs")).await {
eprintln!("Failed to send message about failing to send message about auto_kill panicking: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about auto_kill panicking: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to upload logs about auto_kill panicking, see logs")).await {
eprintln!("Failed to send message about failing to upload logs about auto_kill panicking: {:?}", err);
}
}
};
}
}
};
if is_ok {
upload_mutex.lock().unwrap().push_back(video_data.id);
upload_semaphore.add_permits(1);
break;
}
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(&format!(
"Failed to download video {}, see logs",
&video_data.id
)),
)
.await
{
eprintln!(
"Failed to send message about failing to download video {}: {:?}",
&video_data.id, err
);
}
if let Err(err) = remove_file(&video_filename) {
eprintln!("Failed to delete {}: {:?}", &video_filename, err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
format!("failed-delete-{}.log", &video_filename),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text(format!("Failed to delete {}", &video_filename))
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text(format!("Failed to send message about failing to delete {}, see logs", &video_filename))).await {
eprintln!("Failed to send message about failing to send message about failing to delete {}: {:?}", &video_filename, err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(format!(
"Failed to upload logs about failing to delete {}, see logs",
&video_filename
)),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to delete {}: {:?}", &video_filename, err);
}
}
};
}
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => break,
};
if utils::is_manifest(&video_data) {
sleep(Duration::from_secs(video_data.duration + 30)).await;
match utils::get_video_retry(&mut tclient, &chat, video_data).await {
Some(i) => video_data = i,
None => break,
};
}
child = match start_ffmpeg(
&video_data,
&video_filename,
file.try_clone().expect("Failed to clone file"),
&mut tclient,
&chat,
)
.await
{
Some(i) => i,
None => break,
};
}
}
}
async fn start_ffmpeg(
video_data: &VideoData,
filename: &str,
file: File,
tclient: &mut grammers_client::Client,
chat: &Chat,
) -> Option<Child> {
let mut command = Command::new("ffmpeg");
let mut command = command
.stdin(Stdio::piped())
.stdout(file.try_clone().expect("Failed to clone file"))
.stderr(file)
.arg("-y");
if video_data.requested_formats.is_empty() {
command = command
.arg("-i")
.arg(video_data.url.as_ref().unwrap().as_str());
} else {
for i in &video_data.requested_formats {
command = command.arg("-i").arg(i.url.as_str());
}
}
match command.args(&["-c", "copy", "--", filename]).spawn() {
Ok(i) => Some(i),
Err(err) => {
eprintln!("Failed to spawn ffmpeg: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-spawn-ffmpeg.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to spawn ffmpeg")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to spawn ffmpeg: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to spawn ffmpeg, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to spawn ffmpeg: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to spawn ffmpeg: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to spawn ffmpeg, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to spawn ffmpeg: {:?}", err);
}
}
};
None
}
}
}
async fn auto_kill(mut stdin: ChildStdin, mut file: File) {
let mut last_tell = file
.stream_position()
.expect("Failed to get stream position");
loop {
sleep(Duration::from_secs(5 * 30)).await;
let current_tell = file
.stream_position()
.expect("Failed to get stream position");
if current_tell == last_tell {
break;
}
last_tell = current_tell;
}
stdin
.write_all(b"q")
.await
.expect("Failed to write to ffmpeg stdin");
drop(stdin);
}
pub async fn upload_worker(
mut tclient: grammers_client::Client,
chat: Chat,
semaphore: Arc<Semaphore>,
mutex: Arc<Mutex<VecDeque<String>>>,
seen_videos: Arc<RwLock<Vec<String>>>,
tmp_handled: Arc<Mutex<HashSet<String>>>,
) {
loop {
semaphore.acquire().await.unwrap().forget();
let video_id = mutex.lock().unwrap().pop_front().unwrap();
let video_filename = format!("{}.mkv", &video_id);
let mut success = true;
let mut file = match tokio::fs::File::open(&video_filename).await {
Ok(i) => i,
Err(err) => {
eprintln!("Failed to open video: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-open-video.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to open video")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to open video: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to open video, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to open video: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to open video: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to open video, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to open video: {:?}", err);
}
}
};
continue;
}
};
let total_size = file.seek(SeekFrom::End(0)).await.unwrap();
file.seek(SeekFrom::Start(0)).await.unwrap();
let mut sent_size = 0;
let split = total_size > 2000 * 1024 * 1024;
for i in 0.. {
let filename = if split {
format!("{}.part{:02}", &video_filename, i)
} else {
video_filename.clone()
};
let message = tclient
.send_message(
&chat,
InputMessage::text(&format!("Uploading {}...", &video_filename)),
)
.await;
if let Err(ref err) = message {
eprintln!("Failed to send uploading message: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-send-uploading.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to send uploading message")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to send uploading message: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to send uploading message, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to send uploading message: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to send uploading message: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to send uploading message, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to send uploading message: {:?}", err);
}
}
};
}
let mut size = total_size - sent_size;
if size > 2000 * 1024 * 1024 {
size = 2000 * 1024 * 1024;
}
match tclient
.upload_stream(&mut file, usize::try_from(size).unwrap(), filename.clone())
.await
{
Ok(uploaded) => {
let message = InputMessage::text(&filename)
.mime_type("video/x-matroska")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!("Failed to send video {}: {:?}", &video_id, err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(&format!(
"Failed to send video {}, see logs",
&video_id
)),
)
.await
{
eprintln!(
"Failed to send message about failing to send video {}: {:?}",
&video_id, err
);
}
}
}
Err(err) => {
success = false;
eprintln!("Failed to upload video {}: {:?}", &video_id, err);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(&format!(
"Failed to upload video {}, see logs",
&video_id
)),
)
.await
{
eprintln!(
"Failed to send message about failing to upload video {}: {:?}",
&video_id, err
);
}
}
};
sent_size = file.stream_position().await.unwrap();
if let Ok(mut message) = message {
if let Err(err) = message.delete().await {
eprintln!("Failed to delete uploading message: {:?}", err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(&mut stream, size, "failed-delete-uploading.log".to_string())
.await
{
Ok(uploaded) => {
let message = InputMessage::text("Failed to delete uploading message")
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to delete uploading message: {:?}",
err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text("Failed to send message about failing to delete uploading message, see logs")).await {
eprintln!("Failed to send message about failing to send message about failing to delete uploading message: {:?}", err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to delete uploading message: {:?}",
err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(
"Failed to upload logs about failing to delete uploading message, see logs",
),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to delete uploading message: {:?}", err);
}
}
};
}
}
if sent_size >= total_size {
break;
}
}
drop(file);
if success {
if let Err(err) = remove_file(&video_filename) {
eprintln!("Failed to delete {}: {:?}", &video_filename, err);
let text = format!("{:#?}", err);
let size = text.len();
let mut stream = Cursor::new(text.into_bytes());
match tclient
.upload_stream(
&mut stream,
size,
format!("failed-delete-{}.log", &video_filename),
)
.await
{
Ok(uploaded) => {
let message =
InputMessage::text(format!("Failed to delete {}", &video_filename))
.mime_type("text/plain")
.file(uploaded);
if let Err(err) = tclient.send_message(&chat, message).await {
eprintln!(
"Failed to send message about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient.send_message(&chat, InputMessage::text(format!("Failed to send message about failing to delete {}, see logs", &video_filename))).await {
eprintln!("Failed to send message about failing to send message about failing to delete {}: {:?}", &video_filename, err);
}
}
}
Err(err) => {
eprintln!(
"Failed to upload logs about failing to delete {}: {:?}",
&video_filename, err
);
if let Err(err) = tclient
.send_message(
&chat,
InputMessage::text(format!(
"Failed to upload logs about failing to delete {}, see logs",
&video_filename
)),
)
.await
{
eprintln!("Failed to send message about failing to upload logs about failing to delete {}: {:?}", &video_filename, err);
}
}
};
}
{
tmp_handled.lock().unwrap().remove(&video_id);
seen_videos.write().unwrap().push(video_id);
}
let tmp = seen_videos.read().unwrap().clone();
utils::update_seen_videos(&mut tclient, &chat, tmp).await;
}
}
}