CX Framework
Cross-platform C utility framework
Loading...
Searching...
No Matches
prqueue.h
1#pragma once
2
3#include <cx/cx.h>
4#include <cx/thread/atomic.h>
5#include <cx/thread/aspin.h>
6#include <cx/thread/mutex.h>
7
8// Pointer-ring FIFO queue
9// Lock-free thread-safe expandable ringbuffer implementation.
10//
11// This is in thread/ rather than container/ because it is a low-level structure intended for use
12// to implement either containers or other modules, such as the MPMC work queue.
13//
14// In abstract, it is a truly lock-free* structure in general operation. It makes a few design
15// tradeoffs to achieve this and may not be as performant as a specialized implementation that
16// does not provide full lock-free guarantees, but aims to be solidly fast for most general use
17// cases.
18//
19// * with some small caveats which are documented below
20//
21// It is fully thread-safe and can be used with multiple threads both pushing and popping
22// simultaneously.
23//
24// NULL pointers may NOT be stored in this queue -- it as considered an error to try to insert one.
25//
26// Lock-free guarantees
27//
28// The core algorithm is lock-free in the classical computer science sense. That is, if a thread
29// using it suspends or terminates unexpectedly while it is in the middle of an operation on the
30// queue, it will not irreperably break the structure and block other threads. It does not guarantee
31// full performance if this happens, however.
32//
33// Additionally, the lock-free guarantee does not extend to the garbage collection operation. This
34// operation does indeed use a lock that limits it to only exectuing on a single thread at once,
35// though it IS guaranteed that the collect operation will never block, but instead return immediately
36// if it cannot lock the structure, with the intent that it be run periodically so that it is
37// eventually able to succeed.
38//
39// Threads which stall or terminate in the middle of a push operation will likewise block the ability
40// of the garbage collector to prune the structure, causing a performance hit if the buffer has been
41// expanded multiple times.
42//
43// Despite all these warnings, garbage collection is not an essential function and the queue continues
44// to function even if GC is never able to run, albeit not optimally.
45//
46// Ordering guarantees
47//
48// Push operations within a single thread are guaranteed to be popped in order, provided a single
49// thread (not necessarily the same one) pops them sequentially.
50//
51// Push operations spread across multiple threads are ordered in a best effort fashion and will
52// generally be inserted into the queue in the order that the operations finish in real time. However
53// this is not a strong guarantee, and there may be minor reordering among entries that were pushed
54// at roughly the same time.
55//
56// Similarlty, simultaneous pop operations in multiple threads will return the items in the order
57// they were inserted, but variations in timing among threads may cause them to not be processed
58// in the exact order.
59
60#if DEBUG_LEVEL >= 2 && _64BIT
61#define PRQ_PERF_STATS
62#endif
63CX_C_BEGIN
64
65typedef enum PrqGrowthEnum
66{
67 PRQ_Grow_None = 1,
68 PRQ_Grow_25,
69 PRQ_Grow_50,
70 PRQ_Grow_100, // Default
71 PRQ_Grow_150,
72 PRQ_Grow_200,
73} PrqGrowth;
74
75typedef struct PrqSegment PrqSegment;
76
77#ifdef PRQ_PERF_STATS
78typedef struct PrqPerfStats {
79 atomic(uint64) grow;
80 atomic(uint64) grow_collision;
81 atomic(uint64) shrink;
82 atomic(uint64) shrink_collision;
83 atomic(uint64) head_contention;
84 atomic(uint64) reserved_contention;
85 atomic(uint64) push;
86 atomic(uint64) push_optimal;
87 atomic(uint64) push_fast;
88 atomic(uint64) push_slow;
89 atomic(uint64) push_appeared_full;
90 atomic(uint64) push_actually_full;
91 atomic(uint64) push_collision;
92 atomic(uint64) push_retry;
93 atomic(uint64) push_full_retry;
94 atomic(uint64) push_noreserve_retiring;
95 atomic(uint64) pop;
96 atomic(uint64) pop_optimal;
97 atomic(uint64) pop_fast;
98 atomic(uint64) pop_slow;
99 atomic(uint64) pop_nonobvious_empty;
100 atomic(uint64) pop_assist;
101 atomic(uint64) pop_assist_fail;
102 atomic(uint64) pop_segtraverse;
103 atomic(uint64) pop_collision;
104 atomic(uint64) gc_run;
105 atomic(uint64) seg_retired;
106 atomic(uint64) seg_dealloc;
107 atomic(uint64) seg_dealloc_failinuse;
108} PrqPerfStats;
109#endif
110
111typedef struct PrQueue
112{
113 // Initial size of the queue as well as minimum size.
114 uint32 minsz;
115
116 // Ideal size of the queue.
117 uint32 targetsz;
118
119 // Maximum size of the queue.
120 uint32 maxsz;
121
122 // How much to grow the queue at a time.
123 PrqGrowth growth;
124
125 // How much to shrink the queue at a time.
126 PrqGrowth shrink;
127
128 // Concurrence factor. Defaults to the number of logcal CPUs in the system. This value is
129 // used to determine how many queue entries there must be before threads start assisting
130 // each other to complete operations.
131 uint32 concurrence;
132
133 // Buffer segment that is currently the target for queue pushes and pops. The head of a linked
134 // list of buffer segments when the queue is being grown.
135 atomic(ptr) current;
136
137 // Linked list of buffer segments that have been retired. These are segments that have been
138 // fully emptied and no longer have any valid queue entries, nor can they have any entries
139 // pushed into them, but are being held to deallocate later until it can be guaranteed that
140 // no threads attempting a pop operation still have a pointer to the segment.
141 PrqSegment *retired;
142
143 // Access counter. This is used internally to close a small gap between when a thread retrieves
144 // the current segment pointers and when it increments the use counter, since it cannot do that
145 // atomically while another thread is retiring the segment.
146 atomic(int32) access;
147
148 // Lower 32 bits of timestamp of the last time a segment was added to grow or shrink the queue,
149 // because this needs to be atomic and 64-bit atomics don't exist on all platforms.
150 atomic(uint32) chgtime;
151
152 // Minimum number of milliseconds the queue must wait to shrink after growing or shrinking (default 500ms).
153 uint32 shrinkms;
154
155 // Running average to track the total queue size across GC cycles for possible shrinking.
156 uint32 avgcount;
157 uint32 avgcount_num;
158
159 // Only 1 thread may run the garbage collection operation at a time. It's recommended to try to
160 // run GC optimistcally when a thread has nothing else to do. For example, a consumer thread
161 // that is about to sleep.
162 Mutex gcmtx;
163
164#ifdef PRQ_PERF_STATS
165 // Performance stats for debugging
166 PrqPerfStats stats;
167#endif
168} PrQueue;
169
170typedef struct PrqSegment
171{
172 // Next segment in the chain. When the queue grows, it allocates a larger segment which is
173 // temporarily chained to from the original segment in order to handle the transition while
174 // many other threads may be still using the original.
175 atomic(ptr) nextseg;
176
177 // Next retired segment in retired chain.
178 // NOTE: We cannot reuse nextseg for this. nextseg needs to continue to point to the actual
179 // segment that replaced this one, so that any threads which grabbed a pointer to this segment
180 // before it was retired can still follow it.
181 PrqSegment *nextretired;
182
183 // Atomic counter of how many threads are actively using this segment. This, along with the
184 // access counter, act as a non-blocking optimistic lock similar to a reader-writer lock but
185 // less intrusive. They block garbage collection from deallocating this segment if there is
186 // a chance that a thread may still be reading from it (or about to read from it).
187 atomic(int32) inuse;
188
189 // Total number of queue slots in this buffer.
190 uint32 size;
191
192 // Number of queue slots in this buffer that are used. This number may be slightly higher
193 // than the actual number of slots that has been written to. This is the authoritative
194 // source for how much of the queue is used.
195 atomic(uint32) count;
196
197 // Head of the queue; points at the slot that is first in line to be read. This is cached
198 // information for performance optimization only and is not authoritative.
199 atomic(uint32) head;
200
201 // Number of write reservations on this segment. Only used when the buffer is expandable, to
202 // prevent GC from retiring the segment while there are pending write operations. The high
203 // bit is used to signal that the segment is transitioning to the retired status and further
204 // writes may not be started.
205 atomic(uint32) reserved;
206
207 // The actual ringbuffer
208 atomic(ptr) buffer[];
209} PrqSegment;
210
211// Initialize a fixed-sized ringbuffer. Guaranteed to succeed, or assert.
212void prqInitFixed(_Out_ PrQueue *prq, uint32 sz);
213
214// Initialize a dynamic buffer chain. Guaranteed to succeed, or assert.
215// minsz: Minimum & initial size
216// targetsz: Ideal size the buffer should try to reach
217// maxsz: Maximum size
218// growth: How much to grow at a time
219// shrink: How much to shrink at a time
220void prqInitDynamic(_Out_ PrQueue *prq, uint32 minsz, uint32 targetsz, uint32 maxsz,
221 PrqGrowth growth, PrqGrowth shrink);
222
223// Attempts to destroy the queue. Will fail if there are still entries in the queue, because
224// this is a low-level API that has no idea what the pointers stored in it point to or what
225// kind of cleanup needs to be done on them. It is the caller's responsibility to ensure that
226// all entires have been popped and no threads are still trying to push new ones!
227_Success_(return)
228bool prqDestroy(_Pre_valid_ _Post_invalid_ PrQueue *prq);
229
230// Attempt to push a pointer into the queue. Returns true on success, false if the queue
231// is full and cannot be grown. Upon success, the pointer should be considered to be
232// owned by the queue and not touched again.
233_Success_(return)
234bool prqPush(_Inout_ PrQueue *prq, _Pre_notnull_ _Post_invalid_ void *ptr);
235
236// Attempt to pop a pointer from the queue. If one is available, it is returned. Otherwise,
237// a NULL return indicates that the queue is empty.
238_Must_inspect_result_ _Ret_maybenull_
239void *prqPop(_Inout_ PrQueue *prq);
240
241// Attempt to run a garbage collection cycle on the queue. Returns true if the cycle runs,
242// whether or not anything was collected.
243bool prqCollect(_Inout_ PrQueue *prq);
244
245// Retrieves an estimated count of the number of valid items in the queue. Accuracy
246// varies depending on how busy the queue is.
247uint32 prqCount(_In_ PrQueue *prq);
248
249// Attempt to fetch a copy of the nth pointer from the queue.
250// EXERCISE EXTREME CAUTION!
251// This is very dangerous and tricky to use safely! It is likely the pointer returned
252// by this function is being actively processed by another thread and may have already
253// been removed from the queue by the time you examine its contents. It is almost
254// certain to cause a crash unless you take precautions to prevent the pointed-to
255// data from being destroyed after being processed, and your underlying data must be
256// thread-safe.
257// This function is intended for use only in controlled situations where guarantees can
258// be made about which threads pop items from the queue and what they do with those items.
259void *prqPeek(_In_ PrQueue *prq, uint32 n);
260
261CX_C_END
Mutex synchronization primitive.
Definition mutex.h:60