#include #include #include #include #include #include #include #include "log.h" #include "misc.h" #include "config.h" #include "logcat_thread.h" #include "logcat_entry.h" #define EPOLL_MAX_EVENTS 10 static void mark_nonblock(int fd) { int flags = fcntl(fd, F_GETFL); if (flags < 0) { throw_system_error("fcntl(fd, F_GETFL)"); } if (!(flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { throw_system_error("fcntl(fd, F_SETFL)"); } } static inline void look_for_newlines(char buf[NEWLINE_BUF_SIZE], size_t* used, size_t previous_used, void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) { size_t search_offset = previous_used; size_t real_offset = 0; while (*used > search_offset) { char* newline_ptr = static_cast(memchr(&buf[search_offset], '\n', (*used - search_offset) * sizeof(char))); if (!newline_ptr) { break; } size_t newline_length = (reinterpret_cast(newline_ptr) - reinterpret_cast(&buf[real_offset])) / sizeof(char); (logcat_thread->*handle_line)(&buf[real_offset], newline_length, is_stdout); real_offset += newline_length + 1; search_offset = real_offset; } if (real_offset) { if (*used > search_offset) { memcpy(buf, &buf[real_offset], (*used - real_offset) * sizeof(char)); *used -= real_offset; } else { *used = 0; } } } static inline void handle_fd(int fd, char* buf, size_t* used, void(LogcatThread::*handle_line)(char*, size_t, bool), LogcatThread* logcat_thread, bool is_stdout) { while (true) { ssize_t read_size_u = read(fd, &buf[*used], (NEWLINE_BUF_SIZE - *used) * sizeof(char)); if (read_size_u == 0) { return; } else if (read_size_u < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { return; } else if (read_size_u < 0) { throw_system_error("read()"); } size_t read_size = static_cast(read_size_u) / sizeof(char); if (*used + read_size > NEWLINE_BUF_SIZE) { throw std::runtime_error("Received line longer than 512k"); } size_t previous_used = *used; *used += read_size; look_for_newlines(buf, used, previous_used, handle_line, logcat_thread, is_stdout); } } LogcatThread::LogcatThread(const std::string* logcat_command) : _logcat_command(logcat_command) { int fds[2]; struct epoll_event event = {.events = EPOLLIN | EPOLLET}; if (pipe(fds)) { int errsv = errno; this->~LogcatThread(); throw_system_error(errsv, "pipe() for stdout"); } this->_stdout_read_fd = fds[0]; this->_stdout_write_fd = fds[1]; try { mark_nonblock(this->_stdout_read_fd); } catch (const std::exception& e) { this->~LogcatThread(); throw; } if (pipe(fds)) { int errsv = errno; this->~LogcatThread(); throw_system_error(errsv, "pipe() for stderr"); } this->_stderr_read_fd = fds[0]; this->_stderr_write_fd = fds[1]; try { mark_nonblock(this->_stderr_read_fd); } catch (const std::exception& e) { this->~LogcatThread(); throw; } this->_epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (this->_epoll_fd == -1) { int errsv = errno; this->~LogcatThread(); throw_system_error(errsv, "epoll_create1()"); } event.data.fd = this->_stdout_read_fd; if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stdout_read_fd, &event)) { int errsv = errno; this->~LogcatThread(); throw_system_error(errsv, "epoll_ctl() for stdout"); } event.data.fd = this->_stderr_read_fd; if (epoll_ctl(this->_epoll_fd, EPOLL_CTL_ADD, this->_stderr_read_fd, &event)) { int errsv = errno; this->~LogcatThread(); throw_system_error(errsv, "epoll_ctl() for stderr"); } this->_thread = std::thread(&LogcatThread::_run, this, this->_stop_source.get_token()); } LogcatThread::~LogcatThread() { if (this->_epoll_fd != -1 && close(this->_epoll_fd)) { log(std::string("Failed to close epoll file descriptor: close(): ") + strerror(errno)); } if (this->_stdout_read_fd != -1 && close(this->_stdout_read_fd)) { log(std::string("Failed to close stdout read pipe: close(): ") + strerror(errno)); } if (this->_stdout_write_fd != -1 && close(this->_stdout_write_fd)) { log(std::string("Failed to close stdout write pipe: close(): ") + strerror(errno)); } if (this->_stderr_read_fd != -1 && close(this->_stderr_read_fd)) { log(std::string("Failed to close stderr read pipe: close(): ") + strerror(errno)); } if (this->_stderr_write_fd != -1 && close(this->_stderr_write_fd)) { log(std::string("Failed to close stderr write pipe: close(): ") + strerror(errno)); } } void LogcatThread::request_stop() { this->_stop_source.request_stop(); } void LogcatThread::join() { this->_thread.join(); } void LogcatThread::_put_if_not_stopped(LogcatThreadItem item) { while (true) { if (this->atomic_ring_buffer.try_put_and_increment_write(item)) { break; } if (this->_stop_source.stop_requested()) { break; } } } void LogcatThread::_try_log(std::string message) { LogEntry log_entry = {time(nullptr), std::move(message)}; print_log(log_entry); this->_put_if_not_stopped(std::move(log_entry)); } void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) { if (!is_stdout) { this->_try_log(std::string("Received from logcat stderr: ") + std::string(buf, length)); return; } try { std::optional logcat_entry = try_parse_logcat_entry(buf, length, this->_current_buffer); if (logcat_entry) { this->_put_if_not_stopped(std::move(*logcat_entry)); return; } } catch (const std::exception& e) { this->_try_log(std::string("Failed to parse logcat entry: ") + e.what()); } try { std::optional new_buffer = try_parse_buffer(buf, length); if (new_buffer) { this->_current_buffer = *new_buffer; return; } } catch (const std::exception& e) { this->_try_log(std::string("Failed to parse buffer line: ") + e.what()); } this->_try_log(std::string("Cannot parse logcat stdout: ") + std::string(buf, length)); } void LogcatThread::_run_epoll_round() { struct epoll_event events[EPOLL_MAX_EVENTS]; int ready_fds = epoll_wait(this->_epoll_fd, events, EPOLL_MAX_EVENTS, 1000); if (ready_fds == -1) { int errsv = errno; // just in case if std::string overrides it this->_try_log(std::string("epoll_wait(): ") + strerror(errsv)); return; } for (int i=0; i < ready_fds; i++) { const bool is_stdout = events[i].data.fd == this->_stdout_read_fd; char* buf = is_stdout ? this->_stdout_buf : this->_stderr_buf; size_t* used = is_stdout ? &this->_stdout_buf_used : &this->_stderr_buf_used; try { handle_fd(events[i].data.fd, buf, used, &LogcatThread::_handle_line, this, is_stdout); } catch (const std::exception& e) { this->_try_log(std::string("Failed to handle std") + (is_stdout ? "out: " : "err: ") + e.what()); } } } void LogcatThread::_try_reap(bool has_request) { int wstatus; int res = waitpid(this->_logcat_pid, &wstatus, WNOHANG); if (res == -1) { try { throw_system_error("waitpid()"); } catch (const std::exception& e) { this->_try_log(e.what()); } return; } else if (res != this->_logcat_pid) { return; } this->_logcat_pid = -1; this->_logcat_process_kill_attempts = 0; // just in case if the process was terminated mid-write this->_stdout_buf_used = 0; this->_stderr_buf_used = 0; this->logcat_process_running.clear(); if (WIFEXITED(wstatus)) { if (WEXITSTATUS(wstatus) && !has_request) { this->_try_log(std::string("Logcat exited with ") + std::to_string(WEXITSTATUS(wstatus))); } } else if (WIFSIGNALED(wstatus)) { if (!has_request) { this->_try_log(std::string("Logcat exited with -") + std::to_string(WTERMSIG(wstatus))); } } else { this->_try_log(std::string("Logcat disappeared (wstatus=") + std::to_string(wstatus) + ')'); } } bool LogcatThread::_handle_stop_request() { if (this->_logcat_pid == -1) { return true; } int signal = this->_logcat_process_kill_attempts++ < 3 ? SIGTERM : SIGKILL; if (!kill(this->_logcat_pid, signal)) { return false; } try { throw_system_error("kill()"); } catch (const std::exception& e) { this->_try_log(e.what()); } return true; } bool LogcatThread::_handle_start_request() { if (this->_logcat_pid != -1) { this->_handle_stop_request(); return false; } auto dup2_or_die = [](int oldfd, int newfd) { if (dup2(oldfd, newfd) == newfd) { return; } try { throw_system_error(std::string("dup2(") + std::to_string(oldfd) + ", " + std::to_string(newfd) + ')'); } catch (const std::exception& e) { print_log({time(nullptr), e.what()}); } exit(1); }; auto close_or_warn = [](int fd) { if (!close(fd)) { return; } try { throw_system_error(std::string("close(") + std::to_string(fd) + ')'); } catch (const std::exception& e) { print_log({time(nullptr), e.what()}); } }; std::string logcat_command = *this->_logcat_command; if (logcat_command.empty()) { logcat_command = default_logcat_command; } this->_logcat_pid = fork(); if (this->_logcat_pid == -1) { try { throw_system_error("fork()"); } catch (const std::exception& e) { this->_try_log(e.what()); } return true; } else if (this->_logcat_pid == 0) { dup2_or_die(this->_stderr_write_fd, 2); dup2_or_die(this->_stdout_write_fd, 1); close_or_warn(this->_stdout_write_fd); close_or_warn(this->_stderr_write_fd); close_or_warn(this->_stdout_read_fd); close_or_warn(this->_stderr_read_fd); execlp("sh", "sh", "-c", logcat_command.c_str(), nullptr); try { throw_system_error("execlp()"); } catch (const std::exception& e) { print_log({time(nullptr), e.what()}); } exit(1); } else { this->logcat_process_running.test_and_set(); return true; } } bool LogcatThread::_run_process_round(LogcatProcessRequest request) { bool task_done; switch (request) { case LogcatProcessRequest::None: task_done = false; break; case LogcatProcessRequest::Start: task_done = this->_handle_start_request(); break; case LogcatProcessRequest::Stop: task_done = this->_handle_stop_request(); break; }; if (this->_logcat_pid != -1) { this->_try_reap(request != LogcatProcessRequest::None); } return task_done; } void LogcatThread::_run(std::stop_token stoken) { while (!stoken.stop_requested() || this->_logcat_pid != -1) { #ifndef NDEBUG if (this->debug_log_request.test()) { this->_try_log("A log entry from the logcat thread :D"); this->debug_log_request.clear(); } #endif this->_run_epoll_round(); if (stoken.stop_requested() && this->_logcat_pid != -1) { this->_run_process_round(LogcatProcessRequest::Stop); } else if (this->_run_process_round(this->logcat_process_request.load())) { this->logcat_process_request.store(LogcatProcessRequest::None); } } }