HikoGUI
A low latency retained GUI
Loading...
Searching...
No Matches
wfree_fifo.hpp
1
2
3#pragma once
4
5#include "counters.hpp"
6#include <concepts>
7#include <atomic>
8#include <memory>
9#include <array>
10
11namespace tt {
12
23template<typename T, size_t SlotSize>
25public:
26 static_assert(std::has_single_bit(SlotSize), "Only power-of-two number of messages size allowed.");
27
28 using value_type = T;
29
30 static constexpr size_t fifo_size = 65536;
31 static constexpr size_t slot_size = SlotSize;
32 static constexpr size_t num_slots = fifo_size / slot_size;
33
34 struct slot_type {
35 static constexpr size_t buffer_size = slot_size - sizeof(value_type *);
36
39 };
40
41 constexpr wfree_fifo() noexcept = default;
42 wfree_fifo(wfree_fifo const &) = delete;
43 wfree_fifo(wfree_fifo &&) = delete;
44 wfree_fifo &operator=(wfree_fifo const &) = delete;
45 wfree_fifo &operator=(wfree_fifo &&) = delete;
46
54 template<typename Operation>
55 bool take_one(Operation &&operation) noexcept
56 {
57 auto index = _tail;
58
59 // The shift here should be eliminated by the equal shift inside the index operator.
60 auto &slot = _slots[index / slot_size];
61
62 // Check if the slot.pointer is not null, this is when the writer
63 // has finished writing the slot.
64 if (auto ptr = slot.pointer.load(std::memory_order::acquire)) {
65 std::forward<Operation>(operation)(*ptr);
66
67 // Destroy the object depending if it lives in the buffer or on the heap.
68 if (ptr == static_cast<void *>(slot.buffer.data())) {
69 std::destroy_at(ptr);
70 } else {
71 delete ptr;
72 }
73
74 // We are done with the slot.
75 slot.pointer.store(nullptr, std::memory_order::release);
76 _tail += slot_size;
77 return true;
78 } else {
79 return false;
80 }
81 }
82
89 template<typename Operation>
90 void take_all(Operation const &operation) noexcept
91 {
92 while (take_one(operation)) {}
93 }
94
95 tt_no_inline void contended() noexcept
96 {
97 using namespace std::literals::chrono_literals;
98
99 // If we get here, that would suck, but nothing to do about it.
100 ++global_counter<"wfree_fifo">;
102 }
103
108 template<typename Message, typename... Args>
109 tt_force_inline void emplace(Args &&...args) noexcept requires(sizeof(Message) <= slot_type::buffer_size)
110 {
111 // We need a new index.
112 // - The index is a byte index into 64kByte of memory.
113 // - Increment index by the slot_size and the _head will overflow naturally
114 // at the end of the fifo.
115 // - We don't care about memory ordering with other writer threads. as
116 // each slot has an atomic for handling read/writer contention.
117 // - We don't have to check full/empty, this is done on the slot itself.
118 ttlet index = _head.fetch_add(slot_size, std::memory_order::relaxed);
119 tt_axiom(index % slot_size == 0);
120
121 auto &slot = *std::launder(std::assume_aligned<slot_size>(reinterpret_cast<slot_type*>(reinterpret_cast<char*>(this) + index)));
122
123 // The division here should be eliminated by the equal multiplication inside the index operator.
124 // auto &slot = _slots[index / slot_size];
125
126 // Wait until the slot.pointer is a nullptr.
127 // And acquire the buffer to start overwriting it.
128 // There are no other threads that will make this non-null afterwards.
129 while (slot.pointer.load(std::memory_order_acquire)) {
130 // If we get here, that would suck, but nothing to do about it.
131 [[unlikely]] contended();
132 }
133
134 // Overwrite the buffer with the new slot.
135 auto new_ptr = new (slot.buffer.data()) Message(std::forward<Args>(args)...);
136
137 // Release the buffer for reading.
138 slot.pointer.store(new_ptr, std::memory_order::release);
139 }
140
145 template<typename Message, typename... Args>
146 tt_force_inline void emplace(Args &&...args) noexcept
147 {
148 // We need a new index.
149 // - The index is a byte index into 64kByte of memory.
150 // - Increment index by the slot_size and the _head will overflow naturally
151 // at the end of the fifo.
152 // - We don't care about memory ordering with other writer threads. as
153 // each slot has an atomic for handling read/writer contention.
154 // - We don't have to check full/empty, this is done on the slot itself.
155 ttlet index = _head.fetch_add(slot_size, std::memory_order::relaxed);
156 tt_axiom(index % slot_size == 0);
157
158 // The division here should be eliminated by the equal multiplication inside the index operator.
159 auto &slot = _slots[index / slot_size];
160
161 // We need a heap allocated pointer with a fully constructed object
162 // Lets do this ahead of time to let another thread have some time
163 // to release the ring-buffer-slot.
164 ttlet new_ptr = new Message(std::forward<Args>(args)...);
165
166 // Wait until the slot.pointer is a nullptr.
167 // We don't need to acquire since we wrote into a new heap location.
168 // There are no other threads that will make this non-null afterwards.
169 while (slot.pointer.load(std::memory_order::relaxed)) {
170 // If we get here, that would suck, but nothing to do about it.
171 [[unlikely]] contended();
172 }
173
174 // Release the heap for reading.
175 slot.pointer.store(new_ptr, std::memory_order::release);
176 }
177
178private:
180 std::atomic<uint16_t> _head = 0;
181 uint16_t _tail = 0;
182};
183
184} // namespace tt
A wait-free multiple-producer/single-consumer fifo designed for absolute performance.
Definition wfree_fifo.hpp:24
bool take_one(Operation &&operation) noexcept
Take one message from the fifo slot.
Definition wfree_fifo.hpp:55
tt_force_inline void emplace(Args &&...args) noexcept
Create an message in-place on the fifo.
Definition wfree_fifo.hpp:109
void take_all(Operation const &operation) noexcept
Take all message from the queue.
Definition wfree_fifo.hpp:90
tt_force_inline void emplace(Args &&...args) noexcept
Create an message in-place on the fifo.
Definition wfree_fifo.hpp:146
Definition wfree_fifo.hpp:34
Definition concepts.hpp:34
T fetch_add(T... args)
T sleep_for(T... args)