Is there any other reason for the counter?
If not, is this saved byte worth the cost?
First, the counter is 4 bytes on 32-bit platforms -- to save one byte in the buffer.
But, that's not the worst of it...
The "traditional" ring buffer lacking a counter has a delightful property:
There's no need to protect the bottom side from top side accesses because the top side updates one pointer while the bottom side updates only the other. There are no races between readers and writers! (only *among* multiple readers and writers)
The typical I/O queue has only one reader and writer -- the ISR and the producer/consumer thread.
If there are multiple top-side threads, they will have to arbitrate access to the queue some other way.
But:
The shared counter field requires both producer and consumer be in a "locked" state because the counter is updated by both. chOQWriteTimeout(), for example:
Code: Select all
while (chOQIsFullI(oqp)) {
if (qwait((GenericQueue *)oqp, time) != Q_OK) {
chSysUnlock();
return w;
}
}
oqp->q_counter--;
*oqp->q_wrptr++ = *bp++;
if (oqp->q_wrptr >= oqp->q_top)
oqp->q_wrptr = oqp->q_buffer;
if (nfy)
nfy(oqp);
chSysUnlock(); /* Gives a preemption chance in a controlled point.*/
w++;
if (--n == 0)
return w;
chSysLock();
}
This is locking and unlocking, updating queue state, checking queue full and calling the notify function for *each* byte transferred!
If the counter were removed from the queue struct, one could simply memcpy() contiguous "chunks" into the queue with the system remaining in the unlocked state, updating the queue and calling the notifier only once per chunk. The code looks something like this:
Code: Select all
while (cursor < end) {
size_t chunkLen = end - cursor;
if (n > chunkLen)
n = chunkLen;
while (!(chunkLen = FIFOpoke(&oqp->fifo)))
if (chWait(oqp, time) != Q_OK)
return cursor - bp;
if (chunkLen > n)
chunkLen = n; //split write into more chunks to limit latency
FIFOwriteChunk(&oqp->fifo, cursor, chunkLen);
if (oqp->q_notify) {
chSysLock();
oqp->q_notify(oqp);
chSysUnlock();
}
cursor += chunkLen;
}
return cursor - bp;
FIFOpoke() returns the maximum # of contiguous bytes that fit at the end of the queue.
FIFOwriteChunk() just does a memcpy().
The only time the system locks is to call the chWait and notify functions, before and after each chunk transferred.
(assuming the queue does not fill, at most two chunks will be transferred)
For completeness:
Code: Select all
// return the maximum # of contiguous bytes that may be added to the fifo
// returns zero if and only if the fifo is full
static inline size_t FIFOpoke(FIFO *r)
{
byte *tail = FIFOtail(r);
byte *head = FIFOhead(r);
byte *bound;
if (tail < head)
bound = head-1;
else { //must leave empty slot at fifo's end if head is at its base
bound = FIFOend(r);
if (head == FIFObase(r)) --bound;
}
return bound - tail;
}
// add len bytes of src data to specified end of fifo r
// without verifying that there is room. See FIFOpoke above.
// assumes len is > 0
// returns new tail
static inline byte *FIFOaddChunk(FIFO *r,
const byte *src, byte *dst, size_t len)
{
memcpy(dst, src, len); dst+=len;
if (dst >= FIFOend(r)) dst=FIFObase(r);
return dst;
}
// add len bytes of src data to the end of fifo r
// without verifying that there is room. See FIFOpoke above.
// assumes len is > 0
static inline void FIFOwriteChunk(FIFO *r, const byte *src, size_t len)
{
FIFOtail(r) = FIFOaddChunk(r, src, FIFOtail(r), len);
}
The above is adapted from heavily used production code (last modified in 2003)