diff --git a/arb.h b/arb.h index df26ae0..8b564a0 100644 --- a/arb.h +++ b/arb.h @@ -15,7 +15,7 @@ public: private: size_t _read = 0; size_t _write = 0; - std::atomic _used = 0; + std::atomic _used = 0; T _items[ARB_MAX_SIZE]; }; diff --git a/logcat_thread.cpp b/logcat_thread.cpp index 2b1736c..9af038d 100644 --- a/logcat_thread.cpp +++ b/logcat_thread.cpp @@ -20,6 +20,56 @@ static void mark_nonblock(int fd) { } } +static inline void look_for_newlines(char buf[NEWLINE_BUF_SIZE], size_t* used, size_t previous_used, + void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) { + size_t search_offset = previous_used; + size_t real_offset = 0; + + while (*used > search_offset) { + char* newline_ptr = static_cast(memchr(&buf[search_offset], '\n', (*used - search_offset) * sizeof(char))); + if (!newline_ptr) { + break; + } + + size_t newline_length = (reinterpret_cast(newline_ptr) - reinterpret_cast(&buf[real_offset])) / sizeof(char); + (logcat_thread->*handle_line)(&buf[real_offset], newline_length, is_stdout); + + search_offset += newline_length + 1; + real_offset = search_offset; + } + + if (real_offset) { + if (*used > search_offset) { + memcpy(buf, &buf[real_offset], (*used - real_offset) * sizeof(char)); + *used -= real_offset; + } else { + *used = 0; + } + } +} + +static inline void handle_fd(int fd, char* buf, size_t* used, + void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) { + while (true) { + ssize_t read_size_u = read(fd, &buf[*used], (NEWLINE_BUF_SIZE - *used) * sizeof(char)); + if (read_size_u == 0) { + return; + } else if (read_size_u < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + return; + } else if (read_size_u < 0) { + throw_system_error("read()"); + } + size_t read_size = static_cast(read_size_u) / sizeof(char); + if (*used + read_size > NEWLINE_BUF_SIZE) { + throw std::runtime_error("Received line longer than 512k"); + } + size_t previous_used = *used; + *used += read_size; + + look_for_newlines(buf, used, previous_used, handle_line, logcat_thread, is_stdout); + } +} + LogcatThread::LogcatThread() { int fds[2]; @@ -101,11 +151,24 @@ void LogcatThread::join() { this->_thread.join(); } +void LogcatThread::_handle_line(char* buf, size_t used, bool is_stdout) { + if (!is_stdout) { + std::string log_entry = format_log(std::string("logcat stderr: ") + std::string(buf, used)); + printf("%s\n", log_entry.c_str()); + this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); + return; + } + + // TODO actually parse + std::string log_entry = format_log(std::string("logcat stdout: ") + std::string(buf, used)); + printf("%s\n", log_entry.c_str()); + this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); +} + void LogcatThread::_run(std::stop_token stoken) { struct epoll_event events[EPOLL_MAX_EVENTS]; while (!stoken.stop_requested()) { - printf("(boop)\n"); #ifndef NDEBUG if (this->debug_log_request.test()) { this->atomic_ring_buffer.put_and_increment_write(format_log("A log entry from the logcat thread :D")); @@ -122,7 +185,20 @@ void LogcatThread::_run(std::stop_token stoken) { } for (int i=0; i < ready_fds; i++) { - printf("ain't no way in hell do we get a ready fd (#%d) when we haven't set up anything\n", i); + const bool is_stdout = events[i].data.fd == this->_stdout_read_fd; + char* buf = is_stdout ? this->_stdout_buf : this->_stderr_buf; + size_t* used = is_stdout ? &this->_stdout_buf_used : &this->_stderr_buf_used; + + try { + handle_fd(events[i].data.fd, buf, used, &LogcatThread::_handle_line, this, is_stdout); + } catch (const std::exception& e) { + std::string log_entry = "Failed to handle std"; + log_entry += is_stdout ? "out: " : "err: "; + log_entry += e.what(); + log_entry = format_log(std::move(log_entry)); + printf("%s\n", log_entry.c_str()); + this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); + } } } } diff --git a/logcat_thread.h b/logcat_thread.h index e0e8ca4..4854177 100644 --- a/logcat_thread.h +++ b/logcat_thread.h @@ -5,6 +5,7 @@ #include "arb.h" typedef std::variant LogcatThreadItem; +#define NEWLINE_BUF_SIZE 512 * 1024 class LogcatThread { public: @@ -24,6 +25,7 @@ public: #endif private: + void _handle_line(char* buf, size_t used, bool is_stdout); void _run(std::stop_token stoken); int _epoll_fd = -1; @@ -31,6 +33,10 @@ private: int _stdout_write_fd = -1; int _stderr_read_fd = -1; int _stderr_write_fd = -1; + char _stdout_buf[NEWLINE_BUF_SIZE]; + size_t _stdout_buf_used = 0; + char _stderr_buf[NEWLINE_BUF_SIZE]; + size_t _stderr_buf_used = 0; std::stop_source _stop_source; std::thread _thread; };