Spinlock only if not requested to stop

This commit is contained in:
blankie 2023-01-21 14:51:23 +07:00
parent ae0e97a27f
commit 6cca794fee
Signed by: blankie
GPG Key ID: CC15FC822C7F61F5
3 changed files with 28 additions and 12 deletions

9
arb.h
View File

@ -10,7 +10,7 @@ class AtomicRingBuffer {
public: public:
T* get(); T* get();
void increment_read(); void increment_read();
void put_and_increment_write(T item); bool try_put_and_increment_write(T item);
private: private:
size_t _read = 0; size_t _read = 0;
@ -39,11 +39,12 @@ void AtomicRingBuffer<T>::increment_read() {
} }
template<typename T> template<typename T>
void AtomicRingBuffer<T>::put_and_increment_write(T item) { bool AtomicRingBuffer<T>::try_put_and_increment_write(T item) {
while (this->_used.load() == ARB_MAX_SIZE) { if (this->_used.load() == ARB_MAX_SIZE) {
printf("spinlocking!!!\n"); return false;
} }
this->_items[this->_write] = std::move(item); this->_items[this->_write] = std::move(item);
++this->_used; ++this->_used;
this->_write = (this->_write + 1) % ARB_MAX_SIZE; this->_write = (this->_write + 1) % ARB_MAX_SIZE;
return true;
} }

View File

@ -152,11 +152,25 @@ void LogcatThread::join() {
this->_thread.join(); this->_thread.join();
} }
void LogcatThread::_put_if_not_stopped(LogcatThreadItem item) {
while (true) {
if (this->atomic_ring_buffer.try_put_and_increment_write(item)) {
break;
}
#ifndef NDEBUG
printf("spinlocking!!!\n");
#endif
if (this->_stop_source.stop_requested()) {
break;
}
}
}
void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) { void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
if (!is_stdout) { if (!is_stdout) {
std::string log_entry = format_log(std::string("Received from logcat stderr: ") + std::string(buf, length)); std::string log_entry = format_log(std::string("Received from logcat stderr: ") + std::string(buf, length));
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
return; return;
} }
@ -166,10 +180,10 @@ void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::string log_entry = format_log(std::string("Failed to parse logcat entry: ") + e.what()); std::string log_entry = format_log(std::string("Failed to parse logcat entry: ") + e.what());
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
} }
if (logcat_entry) { if (logcat_entry) {
this->atomic_ring_buffer.put_and_increment_write(std::move(*logcat_entry)); this->_put_if_not_stopped(std::move(*logcat_entry));
return; return;
} }
std::optional<Buffer> new_buffer; std::optional<Buffer> new_buffer;
@ -178,7 +192,7 @@ void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::string log_entry = format_log(std::string("Failed to parse buffer line: ") + e.what()); std::string log_entry = format_log(std::string("Failed to parse buffer line: ") + e.what());
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
} }
if (new_buffer) { if (new_buffer) {
this->_current_buffer = *new_buffer; this->_current_buffer = *new_buffer;
@ -187,7 +201,7 @@ void LogcatThread::_handle_line(char* buf, size_t length, bool is_stdout) {
std::string log_entry = format_log(std::string("Cannot parse logcat stdout: ") + std::string(buf, length)); std::string log_entry = format_log(std::string("Cannot parse logcat stdout: ") + std::string(buf, length));
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
} }
void LogcatThread::_run(std::stop_token stoken) { void LogcatThread::_run(std::stop_token stoken) {
@ -196,7 +210,7 @@ void LogcatThread::_run(std::stop_token stoken) {
while (!stoken.stop_requested()) { while (!stoken.stop_requested()) {
#ifndef NDEBUG #ifndef NDEBUG
if (this->debug_log_request.test()) { if (this->debug_log_request.test()) {
this->atomic_ring_buffer.put_and_increment_write(format_log("A log entry from the logcat thread :D")); this->_put_if_not_stopped(format_log("A log entry from the logcat thread :D"));
this->debug_log_request.clear(); this->debug_log_request.clear();
} }
#endif #endif
@ -205,7 +219,7 @@ void LogcatThread::_run(std::stop_token stoken) {
if (ready_fds == -1) { if (ready_fds == -1) {
std::string log_entry = format_log(std::string("epoll_wait(): ") + strerror(errno)); std::string log_entry = format_log(std::string("epoll_wait(): ") + strerror(errno));
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
break; break;
} }
@ -222,7 +236,7 @@ void LogcatThread::_run(std::stop_token stoken) {
log_entry += e.what(); log_entry += e.what();
log_entry = format_log(std::move(log_entry)); log_entry = format_log(std::move(log_entry));
printf("%s\n", log_entry.c_str()); printf("%s\n", log_entry.c_str());
this->atomic_ring_buffer.put_and_increment_write(std::move(log_entry)); this->_put_if_not_stopped(std::move(log_entry));
} }
} }
} }

View File

@ -26,6 +26,7 @@ public:
#endif #endif
private: private:
void _put_if_not_stopped(LogcatThreadItem item);
void _handle_line(char* buf, size_t length, bool is_stdout); void _handle_line(char* buf, size_t length, bool is_stdout);
void _run(std::stop_token stoken); void _run(std::stop_token stoken);