logmeow/logcat_thread.cpp

422 lines
13 KiB
C++

#include <cstring>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <sys/wait.h>
#include <system_error>
#ifdef USE_EPOLL
#include <sys/epoll.h>
#else
#include <poll.h>
#endif
#include "log.h"
#include "misc.h"
#include "config.h"
#include "logcat_thread.h"
#include "logcat_entry.h"
static void mark_nonblock(int fd) {
int flags = fcntl(fd, F_GETFL);
if (flags < 0) {
throw_system_error("fcntl(fd, F_GETFL)");
}
if (!(flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
throw_system_error("fcntl(fd, F_SETFL)");
}
}
static inline void look_for_newlines(char buf[MAX_LOGCAT_LINE_SIZE], size_t* used, size_t previous_used,
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
size_t search_offset = previous_used;
size_t real_offset = 0;
while (*used > search_offset) {
char* newline_ptr = static_cast<char*>(memchr(&buf[search_offset], '\n', (*used - search_offset) * sizeof(char)));
if (!newline_ptr) {
break;
}
size_t newline_length = (reinterpret_cast<size_t>(newline_ptr) - reinterpret_cast<size_t>(&buf[real_offset])) / sizeof(char);
(logcat_thread->*handle_line)(&buf[real_offset], newline_length, is_stdout);
real_offset += newline_length + 1;
search_offset = real_offset;
}
if (real_offset) {
if (*used > search_offset) {
memcpy(buf, &buf[real_offset], (*used - real_offset) * sizeof(char));
*used -= real_offset;
} else {
*used = 0;
}
}
}
static inline void handle_fd(int fd, char* buf, size_t* used,
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
while (true) {
ssize_t read_size_u = read(fd, &buf[*used], (MAX_LOGCAT_LINE_SIZE - *used) * sizeof(char));
if (read_size_u == 0) {
return;
} else if (read_size_u < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
return;
} else if (read_size_u < 0) {
throw_system_error("read()");
}
size_t read_size = static_cast<size_t>(read_size_u) / sizeof(char);
if (*used + read_size > MAX_LOGCAT_LINE_SIZE) {
throw std::runtime_error("Received line longer than 512k");
}
size_t previous_used = *used;
*used += read_size;
look_for_newlines(buf, used, previous_used, handle_line, logcat_thread, is_stdout);
}
}
LogcatThread::LogcatThread(const std::string* logcat_command) : _logcat_command(logcat_command) {
int fds[2];
if (pipe(fds)) {
int errsv = errno;
this->~LogcatThread();
throw_system_error(errsv, "pipe() for stdout");
}
this->_stdout_read_fd = fds[0];
this->_stdout_write_fd = fds[1];
try {
mark_nonblock(this->_stdout_read_fd);
} catch (const std::exception& e) {
this->~LogcatThread();
throw;
}
if (pipe(fds)) {
int errsv = errno;
this->~LogcatThread();
throw_system_error(errsv, "pipe() for stderr");
}
this->_stderr_read_fd = fds[0];
this->_stderr_write_fd = fds[1];
try {
mark_nonblock(this->_stderr_read_fd);
} catch (const std::exception& e) {
this->~LogcatThread();
throw;
}
#ifdef USE_EPOLL
struct epoll_event event = {.events = EPOLLIN | EPOLLET};
this->_epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (this->_epoll_fd == -1) {
int errsv = errno;
this->~LogcatThread();
throw_system_error(errsv, "epoll_create1()");
}
event.data.fd = this->_stdout_read_fd;
if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stdout_read_fd, &event)) {
int errsv = errno;
this->~LogcatThread();
throw_system_error(errsv, "epoll_ctl() for stdout");
}
event.data.fd = this->_stderr_read_fd;
if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stderr_read_fd, &event)) {
int errsv = errno;
this->~LogcatThread();
throw_system_error(errsv, "epoll_ctl() for stderr");
}
#endif
this->_thread = std::thread(&LogcatThread::_run, this, this->_stop_source.get_token());
}
LogcatThread::~LogcatThread() {
#ifdef USE_EPOLL
if (this->_epoll_fd != -1 && close(this->_epoll_fd)) {
log(std::string("Failed to close epoll file descriptor: close(): ") + strerror(errno));
}
#endif
if (this->_stdout_read_fd != -1 && close(this->_stdout_read_fd)) {
log(std::string("Failed to close stdout read pipe: close(): ") + strerror(errno));
}
if (this->_stdout_write_fd != -1 && close(this->_stdout_write_fd)) {
log(std::string("Failed to close stdout write pipe: close(): ") + strerror(errno));
}
if (this->_stderr_read_fd != -1 && close(this->_stderr_read_fd)) {
log(std::string("Failed to close stderr read pipe: close(): ") + strerror(errno));
}
if (this->_stderr_write_fd != -1 && close(this->_stderr_write_fd)) {
log(std::string("Failed to close stderr write pipe: close(): ") + strerror(errno));
}
}
void LogcatThread::request_stop() {
this->_stop_source.request_stop();
}
void LogcatThread::join() {
this->_thread.join();
}
void LogcatThread::_put_if_not_stopped(LogcatThreadItem item) {
while (true) {
if (this->atomic_ring_buffer.try_put_and_increment_write(item)) {
break;
}
if (this->_stop_source.stop_requested()) {
break;
}
}
}
void LogcatThread::_try_log(std::string message) {
LogEntry log_entry = {time(nullptr), std::move(message)};
print_log(log_entry);
this->_put_if_not_stopped(std::move(log_entry));
}
void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
if (!is_stdout) {
this->_try_log(std::string("Received from logcat stderr: ") + std::string(buf, length));
return;
}
try {
std::optional<LogcatEntry> logcat_entry = try_parse_logcat_entry(buf, length, this->_current_buffer);
if (logcat_entry) {
this->_put_if_not_stopped(std::move(*logcat_entry));
return;
}
} catch (const std::exception& e) {
this->_try_log(std::string("Failed to parse logcat entry: ") + e.what());
}
try {
std::optional<Buffer> new_buffer = try_parse_buffer(buf, length);
if (new_buffer) {
this->_current_buffer = *new_buffer;
return;
}
} catch (const std::exception& e) {
this->_try_log(std::string("Failed to parse buffer line: ") + e.what());
}
this->_try_log(std::string("Cannot parse logcat stdout: ") + std::string(buf, length));
}
void LogcatThread::_run_read_round() {
auto handle_ready_fd = [&](int fd) {
const bool is_stdout = fd == this->_stdout_read_fd;
char* buf = is_stdout ? this->_stdout_buf : this->_stderr_buf;
size_t* used = is_stdout ? &this->_stdout_buf_used : &this->_stderr_buf_used;
try {
handle_fd(fd, buf, used, &LogcatThread::_handle_line, this, is_stdout);
} catch (const std::exception& e) {
this->_try_log(std::string("Failed to handle std") + (is_stdout ? "out: " : "err: ") + e.what());
}
};
#ifdef USE_EPOLL
struct epoll_event events[2];
int ready_fds = epoll_wait(this->_epoll_fd, events, 2, 1000);
if (ready_fds == -1) {
if (errno == EINTR) {
return;
}
try {
throw_system_error("epoll_wait()");
} catch (const std::exception& e) {
this->_try_log(e.what());
}
return;
}
for (int i=0; i < ready_fds; i++) {
handle_ready_fd(events[i].data.fd);
}
#else
struct pollfd fds[2] = {
{
.fd = this->_stdout_read_fd,
.events = POLLIN | POLLPRI,
},
{
.fd = this->_stderr_read_fd,
.events = POLLIN | POLLPRI,
},
};
int ready_fds = poll(fds, 2, 1000);
if (ready_fds < 0) {
try {
throw_system_error("poll()");
} catch (const std::exception& e) {
this->_try_log(e.what());
}
return;
} else if (ready_fds == 0) {
return;
}
if (fds[0].events & POLLIN || fds[0].events & POLLPRI) {
handle_ready_fd(fds[0].fd);
}
// ignore errors 'cause we'll keep the pipes open until thread termination
if (fds[1].events & POLLIN || fds[1].events & POLLPRI) {
handle_ready_fd(fds[1].fd);
}
// ignore errors 'cause we'll keep the pipes open until thread termination
#endif
}
void LogcatThread::_try_reap(bool has_request) {
int wstatus;
int res = waitpid(this->_logcat_pid, &wstatus, WNOHANG);
if (res == -1) {
try {
throw_system_error("waitpid()");
} catch (const std::exception& e) {
this->_try_log(e.what());
}
return;
} else if (res != this->_logcat_pid) {
return;
}
this->_logcat_pid = -1;
this->_logcat_process_kill_attempts = 0;
// just in case if the process was terminated mid-write
this->_stdout_buf_used = 0;
this->_stderr_buf_used = 0;
this->logcat_process_running.clear();
if (WIFEXITED(wstatus)) {
if (WEXITSTATUS(wstatus) && !has_request) {
this->_try_log(std::string("Logcat exited with ") + std::to_string(WEXITSTATUS(wstatus)));
}
} else if (WIFSIGNALED(wstatus)) {
if (!has_request) {
this->_try_log(std::string("Logcat exited with -") + std::to_string(WTERMSIG(wstatus)));
}
} else {
this->_try_log(std::string("Logcat disappeared (wstatus=") + std::to_string(wstatus) + ')');
}
}
bool LogcatThread::_handle_stop_request() {
if (this->_logcat_pid == -1) {
return true;
}
int signal = this->_logcat_process_kill_attempts++ < 3 ? SIGTERM : SIGKILL;
if (!kill(this->_logcat_pid, signal)) {
return false;
}
try {
throw_system_error("kill()");
} catch (const std::exception& e) {
this->_try_log(e.what());
}
return true;
}
bool LogcatThread::_handle_start_request() {
if (this->_logcat_pid != -1) {
this->_handle_stop_request();
return false;
}
auto dup2_or_die = [](int oldfd, int newfd) {
if (dup2(oldfd, newfd) == newfd) {
return;
}
try {
throw_system_error(std::string("dup2(") + std::to_string(oldfd) + ", " + std::to_string(newfd) + ')');
} catch (const std::exception& e) {
print_log({time(nullptr), e.what()});
}
_exit(1);
};
auto close_or_warn = [](int fd) {
if (!close(fd)) {
return;
}
try {
throw_system_error(std::string("close(") + std::to_string(fd) + ')');
} catch (const std::exception& e) {
print_log({time(nullptr), e.what()});
}
};
// TODO what if *this->_logcat_command was mutated during copying?
std::string logcat_command = *this->_logcat_command;
if (logcat_command.empty()) {
logcat_command = default_logcat_command;
}
this->_logcat_pid = fork();
if (this->_logcat_pid == -1) {
try {
throw_system_error("fork()");
} catch (const std::exception& e) {
this->_try_log(e.what());
}
return true;
} else if (this->_logcat_pid == 0) {
dup2_or_die(this->_stderr_write_fd, 2);
dup2_or_die(this->_stdout_write_fd, 1);
close_or_warn(this->_stdout_write_fd);
close_or_warn(this->_stderr_write_fd);
close_or_warn(this->_stdout_read_fd);
close_or_warn(this->_stderr_read_fd);
execlp("sh", "sh", "-c", logcat_command.c_str(), nullptr);
try {
throw_system_error("execlp()");
} catch (const std::exception& e) {
print_log({time(nullptr), e.what()});
}
_exit(1);
} else {
this->logcat_process_running.test_and_set();
return true;
}
}
bool LogcatThread::_run_process_round(LogcatProcessRequest request) {
bool task_done;
switch (request) {
case LogcatProcessRequest::None: task_done = false; break;
case LogcatProcessRequest::Start: task_done = this->_handle_start_request(); break;
case LogcatProcessRequest::Stop: task_done = this->_handle_stop_request(); break;
};
if (this->_logcat_pid != -1) {
this->_try_reap(request != LogcatProcessRequest::None);
}
return task_done;
}
void LogcatThread::_run(std::stop_token stoken) {
while (!stoken.stop_requested() || this->_logcat_pid != -1) {
#ifndef NDEBUG
if (this->debug_log_request.test()) {
this->_try_log("A log entry from the logcat thread :D");
this->debug_log_request.clear();
}
#endif
this->_run_read_round();
if (stoken.stop_requested() && this->_logcat_pid != -1) {
this->_run_process_round(LogcatProcessRequest::Stop);
} else if (this->_run_process_round(this->logcat_process_request.load())) {
this->logcat_process_request.store(LogcatProcessRequest::None);
}
}
}