HikoGUI
A low latency retained GUI
Loading...
Searching...
No Matches
wfree_message_queue.hpp
1// Copyright 2019 Pokitec
2// All rights reserved.
3
4#pragma once
5
6#include "TTauri/Foundation/required.hpp"
7#include "TTauri/Foundation/atomic.hpp"
8#include <array>
9#include <atomic>
10#include <memory>
11#include <thread>
12#include <optional>
13#include <type_traits>
14#include <functional>
15
16namespace tt {
17
18template<typename T, size_t Capacity>
19class wfree_message_queue;
20
21template<typename T, size_t Capacity, bool WriteOperation>
24 size_t index;
25
26public:
27 wfree_message_queue_operation() noexcept : parent(nullptr), index(0) {}
28 wfree_message_queue_operation(wfree_message_queue<T,Capacity> *parent, size_t index) noexcept : parent(parent), index(index) {}
29
31
33 parent(other.parent), index(other.index)
34 {
35 tt_assume(this != &other);
36 other.parent = nullptr;
37 }
38
40 {
41 if (parent == nullptr) {
42 return;
43 }
44
45 if constexpr (WriteOperation) {
46 parent->write_finish(index);
47 } else {
48 parent->read_finish(index);
49 }
50 }
51
52 wfree_message_queue_operation& operator=(wfree_message_queue_operation const &other) = delete;
53
55 {
56 tt_assume(this != &other);
57 std::swap(index, other.index);
58 std::swap(parent, other.parent);
59 }
60
61 T &operator*() noexcept
62 {
63 return (*parent)[index];
64 }
65
66 T *operator->() noexcept
67 {
68 return &(*parent)[index];
69 }
70};
71
72template<typename T, size_t Capacity>
74 using index_type = size_t;
75 using value_type = T;
78
79 struct message_type {
80 // The in_use atomic is first, to improve cache-line and prefetch.
81 // There should not be much false sharing since the thread that uses the message is
82 // also the one that updates the in_use atomic.
83 std::atomic<bool> in_use = false;
84 value_type value;
85 };
86
87 static constexpr index_type capacity = Capacity;
88
91 static constexpr index_type slack = 16;
92 static_assert(capacity > (slack * 2), "The capacity of the message queue should be much larger than its slack.");
93
95 alignas(cache_line_size) std::atomic<index_type> head = 0;
96 alignas(cache_line_size) std::atomic<index_type> tail = 0;
97
98public:
99 wfree_message_queue() = default;
102 wfree_message_queue &operator=(wfree_message_queue const &) = delete;
103 wfree_message_queue &operator=(wfree_message_queue &&) = delete;
104 ~wfree_message_queue() = default;
105
109 index_type size() const noexcept {
110 // head and tail are extremelly large integers, they will never wrap arround.
111 return head.load(std::memory_order_relaxed) - tail.load(std::memory_order_relaxed);
112 }
113
114 bool empty() const noexcept {
115 return head.load(std::memory_order_relaxed) <= tail.load(std::memory_order_relaxed);
116 }
117
118 bool full() const noexcept {
119 return head.load(std::memory_order_relaxed) >= (tail.load(std::memory_order_relaxed) + (capacity - slack));
120 }
121
127 template<typename BlockCounterTag=void>
129 return {this, write_start<BlockCounterTag>()};
130 }
131
138 return {this, read_start()};
139 }
140
141 value_type const &operator[](index_type index) const noexcept {
142 return messages[index % capacity].value;
143 }
144
145 value_type &operator[](index_type index) noexcept {
146 return messages[index % capacity].value;
147 }
148
156 template<typename CounterTag=void>
157 index_type write_start() noexcept {
158 ttlet index = head.fetch_add(1, std::memory_order_acquire);
159 auto &message = messages[index % capacity];
160
161 // We acquired the index before we knew if the queue was full.
162 // So we have to wait until the message is empty, however when it is empty we are
163 // the only one that holds the message, so we only need to mark it that we are done with
164 // writing the message.
165 wait_for_transition<CounterTag>(message.in_use, false, std::memory_order_acquire);
166 return index;
167 }
168
174 void write_finish(index_type index) noexcept {
175 auto &message = messages[index % capacity];
176
177 // Mark that the message is finished with writing.
178 message.in_use.store(true, std::memory_order_release);
179 }
180
187 template<typename CounterTag=void>
188 index_type read_start() noexcept {
189 ttlet index = tail.fetch_add(1, std::memory_order_acquire);
190 auto &message = messages[index % capacity];
191
192 // We acquired the index before we knew if the message was ready.
193 wait_for_transition<CounterTag>(message.in_use, true, std::memory_order_acquire);
194 return index;
195 }
196
202 void read_finish(index_type index) noexcept {
203 auto &message = messages[index % capacity];
204
205 // We acquired the index before we knew if the message was ready.
206 message.in_use.store(false, std::memory_order_release);
207
208 // The message itself does not need to be destructed.
209 // This will happen automatically when wrapping around the ring buffer overwrites the message.
210 }
211};
212
213}
Definition wfree_message_queue.hpp:73
index_type size() const noexcept
Definition wfree_message_queue.hpp:109
index_type write_start() noexcept
Start a write into the message queue.
Definition wfree_message_queue.hpp:157
scoped_write_operation write() noexcept
Definition wfree_message_queue.hpp:128
void read_finish(index_type index) noexcept
Definition wfree_message_queue.hpp:202
scoped_read_operation read() noexcept
Definition wfree_message_queue.hpp:137
void write_finish(index_type index) noexcept
Definition wfree_message_queue.hpp:174
index_type read_start() noexcept
Definition wfree_message_queue.hpp:188
Definition wfree_message_queue.hpp:22
T fetch_add(T... args)
T load(T... args)
T swap(T... args)