HikoGUI
A low latency retained GUI
Loading...
Searching...
No Matches
wfree_fifo.hpp
1
2
3#pragma once
4
5#include <concepts>
6#include <atomic>
7#include <memory>
8#include <array>
9
10namespace tt {
11
22template<typename T, size_t SlotSize>
24public:
25 static_assert(std::has_single_bit(SlotSize), "Only power-of-two number of messages size allowed.");
26
27 using value_type = T;
28
29 static constexpr size_t fifo_size = 65536;
30 static constexpr size_t slot_size = SlotSize;
31 static constexpr size_t num_slots = fifo_size / slot_size;
32
33 struct slot_type {
34 static constexpr size_t buffer_size = slot_size - sizeof(value_type *);
35
38 };
39
40 constexpr wfree_fifo() noexcept = default;
41 wfree_fifo(wfree_fifo const &) = delete;
42 wfree_fifo(wfree_fifo &&) = delete;
43 wfree_fifo &operator=(wfree_fifo const &) = delete;
44 wfree_fifo &operator=(wfree_fifo &&) = delete;
45
53 template<typename Operation>
54 bool take_one(Operation &&operation) noexcept
55 {
56 auto index = _tail;
57
58 // The shift here should be eliminated by the equal shift inside the index operator.
59 auto &slot = _slots[index / slot_size];
60
61 // Check if the slot.pointer is not null, this is when the writer
62 // has finished writing the slot.
63 if (auto ptr = slot.pointer.load(std::memory_order::acquire)) {
64 std::forward<Operation>(operation)(*ptr);
65
66 // Destroy the object depending if it lives in the buffer or on the heap.
67 if (ptr == static_cast<void *>(slot.buffer.data())) {
68 std::destroy_at(ptr);
69 } else {
70 delete ptr;
71 }
72
73 // We are done with the slot.
74 slot.pointer.store(nullptr, std::memory_order::release);
75 _tail += slot_size;
76 return true;
77 } else {
78 return false;
79 }
80 }
81
82 tt_no_inline void contended() noexcept
83 {
84 // If we get here, that would suck, but nothing to do about it.
85 [[unlikely]] increment_counter<"wfree_fifo">();
87 }
88
93 template<typename Message, typename... Args>
94 tt_force_inline void emplace(Args &&...args) noexcept requires(sizeof(Message) <= slot_type::buffer_size)
95 {
96 // We need a new index.
97 // - The index is a byte index into 64kByte of memory.
98 // - Increment index by the slot_size and the _head will overflow naturally
99 // at the end of the fifo.
100 // - We don't care about memory ordering with other writer threads. as
101 // each slot has an atomic for handling read/writer contention.
102 // - We don't have to check full/empty, this is done on the slot itself.
103 ttlet index = _head.fetch_add(slot_size, std::memory_order::relaxed);
104 tt_axiom(index % slot_size == 0);
105
106 auto &slot = *std::launder(std::assume_aligned<slot_size>(reinterpret_cast<slot_type*>(reinterpret_cast<char*>(this) + index)));
107
108 // The division here should be eliminated by the equal multiplication inside the index operator.
109 // auto &slot = _slots[index / slot_size];
110
111 // Wait until the slot.pointer is a nullptr.
112 // And aquire the buffer to start overwriting it.
113 // There are no other threads that will make this non-null afterwards.
114 while (slot.pointer.load(std::memory_order_acquire)) {
115 // If we get here, that would suck, but nothing to do about it.
116 [[unlikely]] contended();
117 }
118
119 // Overwrite the buffer with the new slot.
120 auto new_ptr = new (slot.buffer.data()) Message(std::forward<Args>(args)...);
121
122 // Release the buffer for reading.
123 slot.pointer.store(new_ptr, std::memory_order::release);
124 }
125
130 template<typename Message, typename... Args>
131 tt_force_inline void emplace(Args &&...args) noexcept
132 {
133 // We need a new index.
134 // - The index is a byte index into 64kByte of memory.
135 // - Increment index by the slot_size and the _head will overflow naturally
136 // at the end of the fifo.
137 // - We don't care about memory ordering with other writer threads. as
138 // each slot has an atomic for handling read/writer contention.
139 // - We don't have to check full/empty, this is done on the slot itself.
140 ttlet index = _head.fetch_add(slot_size, std::memory_order::relaxed);
141 tt_axiom(index % slot_size == 0);
142
143 // The division here should be eliminated by the equal multiplication inside the index operator.
144 auto &slot = _slots[index / slot_size];
145
146 // We need a heap allocated pointer with a fully constructed object
147 // Lets do this ahead of time to let another thread have some time
148 // to release the ring-buffer-slot.
149 ttlet new_ptr = new Message(std::forward<Args>(args)...);
150
151 // Wait until the slot.pointer is a nullptr.
152 // We don't need to acquire since we wrote into a new heap location.
153 // There are no other threads that will make this non-null afterwards.
154 while (slot.pointer.load(std::memory_order::relaxed)) {
155 // If we get here, that would suck, but nothing to do about it.
156 [[unlikely]] contended();
157 }
158
159 // Release the heap for reading.
160 slot.pointer.store(new_ptr, std::memory_order::release);
161 }
162
163private:
164 std::array<slot_type, 256> _slots = {};
165 std::atomic<uint16_t> _head = 0;
166 uint16_t _tail = 0;
167};
168
169} // namespace tt
A wait-free multiple-producer/single-consumer fifo designed for absolute performance.
Definition wfree_fifo.hpp:23
bool take_one(Operation &&operation) noexcept
Take one message from the fifo slot.
Definition wfree_fifo.hpp:54
tt_force_inline void emplace(Args &&...args) noexcept
Create an message in-place on the fifo.
Definition wfree_fifo.hpp:94
tt_force_inline void emplace(Args &&...args) noexcept
Create an message in-place on the fifo.
Definition wfree_fifo.hpp:131
Definition wfree_fifo.hpp:33
Definition concepts.hpp:18
T fetch_add(T... args)
T sleep_for(T... args)