230 lines
8.1 KiB
C++
230 lines
8.1 KiB
C++
#include <cstring>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <sys/epoll.h>
|
|
#include <system_error>
|
|
|
|
#include "log.h"
|
|
#include "misc.h"
|
|
#include "logcat_thread.h"
|
|
#include "logcat_entry.h"
|
|
|
|
#define EPOLL_MAX_EVENTS 10
|
|
|
|
static void mark_nonblock(int fd) {
|
|
int flags = fcntl(fd, F_GETFL);
|
|
if (flags < 0) {
|
|
throw_system_error("fcntl(fd, F_GETFL)");
|
|
}
|
|
if (!(flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
|
|
throw_system_error("fcntl(fd, F_SETFL)");
|
|
}
|
|
}
|
|
|
|
static inline void look_for_newlines(char buf[NEWLINE_BUF_SIZE], size_t* used, size_t previous_used,
|
|
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
|
|
size_t search_offset = previous_used;
|
|
size_t real_offset = 0;
|
|
|
|
while (*used > search_offset) {
|
|
char* newline_ptr = static_cast<char*>(memchr(&buf[search_offset], '\n', (*used - search_offset) * sizeof(char)));
|
|
if (!newline_ptr) {
|
|
break;
|
|
}
|
|
|
|
size_t newline_length = (reinterpret_cast<size_t>(newline_ptr) - reinterpret_cast<size_t>(&buf[real_offset])) / sizeof(char);
|
|
(logcat_thread->*handle_line)(&buf[real_offset], newline_length, is_stdout);
|
|
|
|
real_offset += newline_length + 1;
|
|
search_offset = real_offset;
|
|
}
|
|
|
|
if (real_offset) {
|
|
if (*used > search_offset) {
|
|
memcpy(buf, &buf[real_offset], (*used - real_offset) * sizeof(char));
|
|
*used -= real_offset;
|
|
} else {
|
|
*used = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
static inline void handle_fd(int fd, char* buf, size_t* used,
|
|
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
|
|
while (true) {
|
|
ssize_t read_size_u = read(fd, &buf[*used], (NEWLINE_BUF_SIZE - *used) * sizeof(char));
|
|
if (read_size_u == 0) {
|
|
return;
|
|
} else if (read_size_u < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
return;
|
|
} else if (read_size_u < 0) {
|
|
throw_system_error("read()");
|
|
}
|
|
size_t read_size = static_cast<size_t>(read_size_u) / sizeof(char);
|
|
if (*used + read_size > NEWLINE_BUF_SIZE) {
|
|
throw std::runtime_error("Received line longer than 512k");
|
|
}
|
|
size_t previous_used = *used;
|
|
*used += read_size;
|
|
|
|
look_for_newlines(buf, used, previous_used, handle_line, logcat_thread, is_stdout);
|
|
}
|
|
}
|
|
|
|
|
|
LogcatThread::LogcatThread() {
|
|
int fds[2];
|
|
struct epoll_event event = {.events = EPOLLIN | EPOLLET};
|
|
|
|
if (pipe(fds)) {
|
|
int errsv = errno;
|
|
this->~LogcatThread();
|
|
throw_system_error(errsv, "pipe() for stdout");
|
|
}
|
|
this->_stdout_read_fd = fds[0];
|
|
this->_stdout_write_fd = fds[1];
|
|
try {
|
|
mark_nonblock(this->_stdout_read_fd);
|
|
} catch (const std::exception& e) {
|
|
this->~LogcatThread();
|
|
throw;
|
|
}
|
|
|
|
if (pipe(fds)) {
|
|
int errsv = errno;
|
|
this->~LogcatThread();
|
|
throw_system_error(errsv, "pipe() for stderr");
|
|
}
|
|
this->_stderr_read_fd = fds[0];
|
|
this->_stderr_write_fd = fds[1];
|
|
try {
|
|
mark_nonblock(this->_stderr_read_fd);
|
|
} catch (const std::exception& e) {
|
|
this->~LogcatThread();
|
|
throw;
|
|
}
|
|
|
|
this->_epoll_fd = epoll_create1(EPOLL_CLOEXEC);
|
|
if (this->_epoll_fd == -1) {
|
|
int errsv = errno;
|
|
this->~LogcatThread();
|
|
throw_system_error(errsv, "epoll_create1()");
|
|
}
|
|
event.data.fd = this->_stdout_read_fd;
|
|
if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stdout_read_fd, &event)) {
|
|
int errsv = errno;
|
|
this->~LogcatThread();
|
|
throw_system_error(errsv, "epoll_ctl() for stdout");
|
|
}
|
|
event.data.fd = this->_stderr_read_fd;
|
|
if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stderr_read_fd, &event)) {
|
|
int errsv = errno;
|
|
this->~LogcatThread();
|
|
throw_system_error(errsv, "epoll_ctl() for stderr");
|
|
}
|
|
|
|
this->_thread = std::thread(&LogcatThread::_run, this, this->_stop_source.get_token());
|
|
}
|
|
|
|
LogcatThread::~LogcatThread() {
|
|
if (this->_epoll_fd != -1 && close(this->_epoll_fd)) {
|
|
log(std::string("Failed to close epoll file descriptor: close(): ") + strerror(errno));
|
|
}
|
|
if (this->_stdout_read_fd != -1 && close(this->_stdout_read_fd)) {
|
|
log(std::string("Failed to close stdout read pipe: close(): ") + strerror(errno));
|
|
}
|
|
if (this->_stdout_write_fd != -1 && close(this->_stdout_write_fd)) {
|
|
log(std::string("Failed to close stdout write pipe: close(): ") + strerror(errno));
|
|
}
|
|
if (this->_stderr_read_fd != -1 && close(this->_stderr_read_fd)) {
|
|
log(std::string("Failed to close stderr read pipe: close(): ") + strerror(errno));
|
|
}
|
|
if (this->_stderr_write_fd != -1 && close(this->_stderr_write_fd)) {
|
|
log(std::string("Failed to close stderr write pipe: close(): ") + strerror(errno));
|
|
}
|
|
}
|
|
|
|
void LogcatThread::request_stop() {
|
|
this->_stop_source.request_stop();
|
|
}
|
|
|
|
void LogcatThread::join() {
|
|
this->_thread.join();
|
|
}
|
|
|
|
void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
|
|
if (!is_stdout) {
|
|
std::string log_entry = format_log(std::string("Received from logcat stderr: ") + std::string(buf, length));
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
return;
|
|
}
|
|
|
|
std::optional<LogcatEntry> logcat_entry;
|
|
try {
|
|
logcat_entry = try_parse_logcat_entry(buf, length, this->_current_buffer);
|
|
} catch (const std::exception& e) {
|
|
std::string log_entry = format_log(std::string("Failed to parse logcat entry: ") + e.what());
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
}
|
|
if (logcat_entry) {
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(*logcat_entry));
|
|
return;
|
|
}
|
|
std::optional<Buffer> new_buffer;
|
|
try {
|
|
new_buffer = try_parse_buffer(buf, length);
|
|
} catch (const std::exception& e) {
|
|
std::string log_entry = format_log(std::string("Failed to parse buffer line: ") + e.what());
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
}
|
|
if (new_buffer) {
|
|
this->_current_buffer = *new_buffer;
|
|
return;
|
|
}
|
|
|
|
std::string log_entry = format_log(std::string("Cannot parse logcat stdout: ") + std::string(buf, length));
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
}
|
|
|
|
void LogcatThread::_run(std::stop_token stoken) {
|
|
struct epoll_event events[EPOLL_MAX_EVENTS];
|
|
|
|
while (!stoken.stop_requested()) {
|
|
#ifndef NDEBUG
|
|
if (this->debug_log_request.test()) {
|
|
this->atomic_ring_buffer.put_and_increment_write(format_log("A log entry from the logcat thread :D"));
|
|
this->debug_log_request.clear();
|
|
}
|
|
#endif
|
|
|
|
int ready_fds = epoll_wait(this->_epoll_fd, events, EPOLL_MAX_EVENTS, 1000);
|
|
if (ready_fds == -1) {
|
|
std::string log_entry = format_log(std::string("epoll_wait(): ") + strerror(errno));
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
break;
|
|
}
|
|
|
|
for (int i=0; i < ready_fds; i++) {
|
|
const bool is_stdout = events[i].data.fd == this->_stdout_read_fd;
|
|
char* buf = is_stdout ? this->_stdout_buf : this->_stderr_buf;
|
|
size_t* used = is_stdout ? &this->_stdout_buf_used : &this->_stderr_buf_used;
|
|
|
|
try {
|
|
handle_fd(events[i].data.fd, buf, used, &LogcatThread::_handle_line, this, is_stdout);
|
|
} catch (const std::exception& e) {
|
|
std::string log_entry = "Failed to handle std";
|
|
log_entry += is_stdout ? "out: " : "err: ";
|
|
log_entry += e.what();
|
|
log_entry = format_log(std::move(log_entry));
|
|
printf("%s\n", log_entry.c_str());
|
|
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
|
|
}
|
|
}
|
|
}
|
|
}
|