Handle lines from stdout and stderr

This commit is contained in:
blankie 2023-01-19 12:54:20 +07:00
parent d9c7184b0e
commit 6c339fa799
Signed by: blankie
GPG Key ID: CC15FC822C7F61F5
3 changed files with 85 additions and 3 deletions

2
arb.h
View File

@ -15,7 +15,7 @@ public:
private:
size_t _read = 0;
size_t _write = 0;
std::atomic<size_t> _used = 0;
std::atomic<char> _used = 0;
T _items[ARB_MAX_SIZE];
};

View File

@ -20,6 +20,56 @@ static void mark_nonblock(int fd) {
}
}
static inline void look_for_newlines(char buf[NEWLINE_BUF_SIZE], size_t* used, size_t previous_used,
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
size_t search_offset = previous_used;
size_t real_offset = 0;
while (*used > search_offset) {
char* newline_ptr = static_cast<char*>(memchr(&buf[search_offset], '\n', (*used - search_offset) * sizeof(char)));
if (!newline_ptr) {
break;
}
size_t newline_length = (reinterpret_cast<size_t>(newline_ptr) - reinterpret_cast<size_t>(&buf[real_offset])) / sizeof(char);
(logcat_thread->*handle_line)(&buf[real_offset], newline_length, is_stdout);
search_offset += newline_length + 1;
real_offset = search_offset;
}
if (real_offset) {
if (*used > search_offset) {
memcpy(buf, &buf[real_offset], (*used - real_offset) * sizeof(char));
*used -= real_offset;
} else {
*used = 0;
}
}
}
static inline void handle_fd(int fd, char* buf, size_t* used,
void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) {
while (true) {
ssize_t read_size_u = read(fd, &buf[*used], (NEWLINE_BUF_SIZE - *used) * sizeof(char));
if (read_size_u == 0) {
return;
} else if (read_size_u < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
return;
} else if (read_size_u < 0) {
throw_system_error("read()");
}
size_t read_size = static_cast<size_t>(read_size_u) / sizeof(char);
if (*used + read_size > NEWLINE_BUF_SIZE) {
throw std::runtime_error("Received line longer than 512k");
}
size_t previous_used = *used;
*used += read_size;
look_for_newlines(buf, used, previous_used, handle_line, logcat_thread, is_stdout);
}
}
LogcatThread::LogcatThread() {
int fds[2];
@ -101,11 +151,24 @@ void LogcatThread::join() {
this->_thread.join();
}
void LogcatThread::_handle_line(char* buf, size_t used, bool is_stdout) {
if (!is_stdout) {
std::string log_entry = format_log(std::string("logcat stderr: ") + std::string(buf, used));
printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
return;
}
// TODO actually parse
std::string log_entry = format_log(std::string("logcat stdout: ") + std::string(buf, used));
printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
}
void LogcatThread::_run(std::stop_token stoken) {
struct epoll_event events[EPOLL_MAX_EVENTS];
while (!stoken.stop_requested()) {
printf("(boop)\n");
#ifndef NDEBUG
if (this->debug_log_request.test()) {
this->atomic_ring_buffer.put_and_increment_write(format_log("A log entry from the logcat thread :D"));
@ -122,7 +185,20 @@ void LogcatThread::_run(std::stop_token stoken) {
}
for (int i=0; i < ready_fds; i++) {
printf("ain't no way in hell do we get a ready fd (#%d) when we haven't set up anything\n", i);
const bool is_stdout = events[i].data.fd == this->_stdout_read_fd;
char* buf = is_stdout ? this->_stdout_buf : this->_stderr_buf;
size_t* used = is_stdout ? &this->_stdout_buf_used : &this->_stderr_buf_used;
try {
handle_fd(events[i].data.fd, buf, used, &LogcatThread::_handle_line, this, is_stdout);
} catch (const std::exception& e) {
std::string log_entry = "Failed to handle std";
log_entry += is_stdout ? "out: " : "err: ";
log_entry += e.what();
log_entry = format_log(std::move(log_entry));
printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry));
}
}
}
}

View File

@ -5,6 +5,7 @@
#include "arb.h"
typedef std::variant<std::string> LogcatThreadItem;
#define NEWLINE_BUF_SIZE 512 * 1024
class LogcatThread {
public:
@ -24,6 +25,7 @@ public:
#endif
private:
void _handle_line(char* buf, size_t used, bool is_stdout);
void _run(std::stop_token stoken);
int _epoll_fd = -1;
@ -31,6 +33,10 @@ private:
int _stdout_write_fd = -1;
int _stderr_read_fd = -1;
int _stderr_write_fd = -1;
char _stdout_buf[NEWLINE_BUF_SIZE];
size_t _stdout_buf_used = 0;
char _stderr_buf[NEWLINE_BUF_SIZE];
size_t _stderr_buf_used = 0;
std::stop_source _stop_source;
std::thread _thread;
};