HikoGUI
A low latency retained GUI
Loading...
Searching...
No Matches
wfree_message_queue.hpp
1// Copyright Take Vos 2019-2020.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
4
5#pragma once
6
7#include "required.hpp"
8#include "atomic.hpp"
9#include "fixed_string.hpp"
10#include <array>
11#include <atomic>
12#include <memory>
13#include <thread>
14#include <optional>
15#include <type_traits>
16#include <functional>
17
18namespace tt {
19
20template<typename T, size_t Capacity>
21class wfree_message_queue;
22
23template<typename T, size_t Capacity, bool WriteOperation>
26 size_t index;
27
28public:
29 wfree_message_queue_operation() noexcept : parent(nullptr), index(0) {}
30 wfree_message_queue_operation(wfree_message_queue<T,Capacity> *parent, size_t index) noexcept : parent(parent), index(index) {}
31
33
35 parent(other.parent), index(other.index)
36 {
37 tt_axiom(this != &other);
38 other.parent = nullptr;
39 }
40
42 {
43 if (parent == nullptr) {
44 return;
45 }
46
47 if constexpr (WriteOperation) {
48 parent->write_finish(index);
49 } else {
50 parent->read_finish(index);
51 }
52 }
53
54 wfree_message_queue_operation& operator=(wfree_message_queue_operation const &other) = delete;
55
57 {
58 tt_axiom(this != &other);
59 std::swap(index, other.index);
60 std::swap(parent, other.parent);
61 }
62
63 T &operator*() noexcept
64 {
65 return (*parent)[index];
66 }
67
68 T *operator->() noexcept
69 {
70 return &(*parent)[index];
71 }
72};
73
74template<typename T, size_t Capacity>
76 using index_type = size_t;
77 using value_type = T;
80
81 struct message_type {
82 // The in_use atomic is first, to improve cache-line and prefetch.
83 // There should not be much false sharing since the thread that uses the message is
84 // also the one that updates the in_use atomic.
85 std::atomic<bool> in_use = false;
86 value_type value;
87 };
88
89 static constexpr index_type capacity = Capacity;
90
93 static constexpr index_type slack = 16;
94 static_assert(capacity > (slack * 2), "The capacity of the message queue should be much larger than its slack.");
95
97 alignas(hardware_destructive_interference_size) std::atomic<index_type> head = 0;
98 alignas(hardware_destructive_interference_size) std::atomic<index_type> tail = 0;
99
100public:
101 wfree_message_queue() = default;
104 wfree_message_queue &operator=(wfree_message_queue const &) = delete;
105 wfree_message_queue &operator=(wfree_message_queue &&) = delete;
106 ~wfree_message_queue() = default;
107
111 index_type size() const noexcept {
112 // head and tail are extremely large integers, they will never wrap around.
113 return head.load(std::memory_order::relaxed) - tail.load(std::memory_order::relaxed);
114 }
115
116 bool empty() const noexcept {
117 return head.load(std::memory_order::relaxed) <= tail.load(std::memory_order::relaxed);
118 }
119
120 bool full() const noexcept {
121 return head.load(std::memory_order::relaxed) >= (tail.load(std::memory_order::relaxed) + (capacity - slack));
122 }
123
129 template<basic_fixed_string BlockCounterTag = "">
131 return {this, write_start<BlockCounterTag>()};
132 }
133
140 return {this, read_start()};
141 }
142
143 value_type const &operator[](index_type index) const noexcept {
144 return messages[index % capacity].value;
145 }
146
147 value_type &operator[](index_type index) noexcept {
148 return messages[index % capacity].value;
149 }
150
158 template<basic_fixed_string CounterTag = "">
159 index_type write_start() noexcept {
160 ttlet index = head.fetch_add(1, std::memory_order::acquire);
161 auto &message = messages[index % capacity];
162
163 // We acquired the index before we knew if the queue was full.
164 // So we have to wait until the message is empty, however when it is empty we are
165 // the only one that holds the message, so we only need to mark it that we are done with
166 // writing the message.
167 wait_for_transition<CounterTag>(message.in_use, false, std::memory_order::acquire);
168 return index;
169 }
170
176 void write_finish(index_type index) noexcept {
177 auto &message = messages[index % capacity];
178
179 // Mark that the message is finished with writing.
180 message.in_use.store(true, std::memory_order::release);
181 }
182
189 template<basic_fixed_string CounterTag = "">
190 index_type read_start() noexcept {
191 ttlet index = tail.fetch_add(1, std::memory_order::acquire);
192 auto &message = messages[index % capacity];
193
194 // We acquired the index before we knew if the message was ready.
195 wait_for_transition<CounterTag>(message.in_use, true, std::memory_order::acquire);
196 return index;
197 }
198
204 void read_finish(index_type index) noexcept {
205 auto &message = messages[index % capacity];
206
207 // We acquired the index before we knew if the message was ready.
208 message.in_use.store(false, std::memory_order::release);
209
210 // The message itself does not need to be destructed.
211 // This will happen automatically when wrapping around the ring buffer overwrites the message.
212 }
213};
214
215}
example: ``` template<tt::basic_fixed_string Foo> class A { auto bar() { return std::string{Foo}; } }...
Definition fixed_string.hpp:30
Definition wfree_message_queue.hpp:75
index_type size() const noexcept
Definition wfree_message_queue.hpp:111
index_type write_start() noexcept
Start a write into the message queue.
Definition wfree_message_queue.hpp:159
scoped_write_operation write() noexcept
Definition wfree_message_queue.hpp:130
void read_finish(index_type index) noexcept
Definition wfree_message_queue.hpp:204
scoped_read_operation read() noexcept
Definition wfree_message_queue.hpp:139
void write_finish(index_type index) noexcept
Definition wfree_message_queue.hpp:176
index_type read_start() noexcept
Definition wfree_message_queue.hpp:190
Definition wfree_message_queue.hpp:24
T fetch_add(T... args)
T load(T... args)
T swap(T... args)