GenericQueue with counter?

This forum is dedicated to feedback, discussions about ongoing or future developments, ideas and suggestions regarding the ChibiOS projects are welcome. This forum is NOT for support.
genosensor
Posts: 65
Joined: Thu Oct 03, 2013 1:06 am
Location: Santa Cruz, California
Has thanked: 1 time
Been thanked: 1 time

GenericQueue with counter?

Postby genosensor » Wed Mar 01, 2017 9:25 am

chqueues implements Generic, Input and Output Queues that are unusual in that they have a counter field to disambiguate their empty and full states. This makes is possible to use every byte of the buffer.
Is there any other reason for the counter?
If not, is this saved byte worth the cost?

First, the counter is 4 bytes on 32-bit platforms -- to save one byte in the buffer.
But, that's not the worst of it...

The "traditional" ring buffer lacking a counter has a delightful property:

There's no need to protect the bottom side from top side accesses because the top side updates one pointer while the bottom side updates only the other. There are no races between readers and writers! (only *among* multiple readers and writers)
The typical I/O queue has only one reader and writer -- the ISR and the producer/consumer thread.
If there are multiple top-side threads, they will have to arbitrate access to the queue some other way.

But:
The shared counter field requires both producer and consumer be in a "locked" state because the counter is updated by both. chOQWriteTimeout(), for example:

Code: Select all

    while (chOQIsFullI(oqp)) {
      if (qwait((GenericQueue *)oqp, time) != Q_OK) {
        chSysUnlock();
        return w;
      }
    }
    oqp->q_counter--;
    *oqp->q_wrptr++ = *bp++;
    if (oqp->q_wrptr >= oqp->q_top)
      oqp->q_wrptr = oqp->q_buffer;

    if (nfy)
      nfy(oqp);

    chSysUnlock(); /* Gives a preemption chance in a controlled point.*/
    w++;
    if (--n == 0)
      return w;
    chSysLock();
  }

This is locking and unlocking, updating queue state, checking queue full and calling the notify function for *each* byte transferred!

If the counter were removed from the queue struct, one could simply memcpy() contiguous "chunks" into the queue with the system remaining in the unlocked state, updating the queue and calling the notifier only once per chunk. The code looks something like this:

Code: Select all

  while (cursor < end) {
    size_t chunkLen = end - cursor;
    if (n > chunkLen)
      n = chunkLen;
    while (!(chunkLen = FIFOpoke(&oqp->fifo)))
      if (chWait(oqp, time) != Q_OK)
        return cursor - bp;
    if (chunkLen > n)
      chunkLen = n;   //split write into more chunks to limit latency
    FIFOwriteChunk(&oqp->fifo, cursor, chunkLen);
    if (oqp->q_notify) {
      chSysLock();
      oqp->q_notify(oqp);
      chSysUnlock();
    }
    cursor += chunkLen;
  }
  return cursor - bp;


FIFOpoke() returns the maximum # of contiguous bytes that fit at the end of the queue.
FIFOwriteChunk() just does a memcpy().
The only time the system locks is to call the chWait and notify functions, before and after each chunk transferred.
(assuming the queue does not fill, at most two chunks will be transferred)

For completeness:

Code: Select all

// return the maximum # of contiguous bytes that may be added to the fifo
// returns zero if and only if the fifo is full
static inline size_t FIFOpoke(FIFO *r)
{
  byte *tail = FIFOtail(r);
  byte *head = FIFOhead(r);
  byte *bound;
  if (tail < head)
    bound = head-1;
  else {  //must leave empty slot at fifo's end if head is at its base
    bound = FIFOend(r);
    if (head == FIFObase(r)) --bound;
  }
  return bound - tail;
}

// add len bytes of src data to specified end of fifo r
// without verifying that there is room.  See FIFOpoke above.
// assumes len is > 0
// returns new tail
static inline byte *FIFOaddChunk(FIFO *r,
                                  const byte *src, byte *dst, size_t len)
{
  memcpy(dst, src, len); dst+=len;
  if (dst >= FIFOend(r)) dst=FIFObase(r);
  return dst;
}


// add len bytes of src data to the end of fifo r
// without verifying that there is room.  See FIFOpoke above.
// assumes len is > 0
static inline void FIFOwriteChunk(FIFO *r, const byte *src, size_t len)
{
  FIFOtail(r) = FIFOaddChunk(r, src, FIFOtail(r), len);
}


The above is adapted from heavily used production code (last modified in 2003)
- brent

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: GenericQueue with counter?

Postby Giovanni » Wed Mar 01, 2017 9:43 am

Hi,

Without a counter, what are the conditions for queue full and queue empty? both conditions would have pointers at the same address.

Giovanni

genosensor
Posts: 65
Joined: Thu Oct 03, 2013 1:06 am
Location: Santa Cruz, California
Has thanked: 1 time
Been thanked: 1 time

Re: GenericQueue with counter?

Postby genosensor » Wed Mar 01, 2017 10:20 am

The queue is empty when its read (head) and write (tail) pointers are equal.
The queue is full when adding a byte to it would result in its becoming empty.

This is a painfully thorough article describing lockless ring buffers:
Lock-Free Single-Producer - Single Consumer Circular Queue

Honestly, I'd never seen a counter associated with the ring before.
But, now that I look for it, I do see a couple references to the technique.

- brent


Code: Select all

// return TRUE iff there are no bytes available beyond a given tail ptr
static inline bool FIFOfullFrom(FIFO *r, byte *tail) {
  byte *nextTail = tail+1;
  byte *head = FIFOhead(r);
  return head != FIFObase(r) ? nextTail == head : nextTail == FIFOend(r);
}

// return TRUE iff FIFOfree(r) == 0
#define FIFOfull(r)  (FIFOfullFrom(r, FIFOtail(r)))
- brent

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: GenericQueue with counter?

Postby Giovanni » Wed Mar 01, 2017 10:51 am

Hi,

The counter can be configured to be a single byte too, using that method you waste a byte in the queue so wasted space is not much an issue. The counter does not prevent implementing block transfers too, something I planned for next HAL is queuing by lines (delimiters) or blocks.

The choice should be done considering the complexity of the various expressions for isEmpty(), isFull() etc

Giovanni

genosensor
Posts: 65
Joined: Thu Oct 03, 2013 1:06 am
Location: Santa Cruz, California
Has thanked: 1 time
Been thanked: 1 time

Re: GenericQueue with counter?

Postby genosensor » Wed Mar 01, 2017 11:34 am

How can the existing counter be made a single byte if the queue size > 255 bytes?
I suppose the "counter" could be reduced to simple flag that gets set when the fifo becomes full and cleared when it becomes empty.
Is this what you have in mind?

As far as FIFO full/empty tests, I don't see much difference.
The test for FIFO empty without the counter is simpler -- just tail == head
The test for full is a little slower.
Here's an easier to read version of FIFOfull with all macros expanded:

Code: Select all

typedef struct FIFO {
  byte *base, *end;   //points to base and past end of ring buffer
  byte *head, *tail;  //head and tail in buffer
} FIFO;

static inline bool FIFOfull(FIFO *r) {
  byte *nextTail = tail+1;
  if (nextTail >= r->end)
    nextTail = r->base;
  return nextTail == r->head;
}

When updating the queue, there's no counter to update, so there is less to state to maintain.

One could optimize the counter containing FIFO better, and this would be worthwhile,
but the counter containing version could never be made lockless.

I have a lockless FIFO version of chqueues and serial that (intends to) preserve API compatibility.
I'll post it on github in the coming week or so.

If nothing else, the code might give you some ideas if/when you decide to revisit chqueues.
- brent

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: GenericQueue with counter?

Postby Giovanni » Wed Mar 01, 2017 12:12 pm

Interesting, I will give it a try but cannot promise it will go in next release.

What the equivalent of qSpaceI(qp) would be? this one looks more complicated.

Giovanni

faisal
Posts: 374
Joined: Wed Jul 19, 2017 12:44 am
Has thanked: 44 times
Been thanked: 60 times

Re: GenericQueue with counter?

Postby faisal » Mon Sep 25, 2017 4:08 am

Just want to chime in with another vote for lock-less single producer single consumer queues. I was working on a high performance DSP application (on a relatively slow) processor a while back, and lock-less queues saved the day (given the assumption of a single producer, single consumer) along with contiguous block writes and reads to and from the queue (with the exception of having to split it up into two copy operations if there was a wrap around in the circular buffer). Just the elimination of the branch instructions were quite significant in itself ...

genosensor
Posts: 65
Joined: Thu Oct 03, 2013 1:06 am
Location: Santa Cruz, California
Has thanked: 1 time
Been thanked: 1 time

Re: GenericQueue with counter?

Postby genosensor » Wed Nov 29, 2017 9:20 am

I've been running Chibios 2.6.10 with lockless queues from some months now.
There was a race in the examples I posted earlier.
Here's the current working code for fast queue reading:

Code: Select all

/**
 * @brief   Input queue read with timeout when no other reader thread exists
 * @details The function reads data from an input queue into a buffer. The
 *          operation completes when the specified amount of data has been
 *          transferred or after the specified timeout or if the queue has
 *          been reset.
 * @note    The function is not atomic, if you need atomicity it is suggested
 *          to use a semaphore or a mutex for mutual exclusion.
 *
 * @param[in] iqp       pointer to an @p InputQueue structure
 * @param[out] bp       pointer to the data buffer
 * @param[in] n         the maximum amount of data to be transferred
 * @param[in] time      the number of ticks before the operation timeouts,
 *                      the following special values are allowed:
 *                      - @a TIME_IMMEDIATE immediate timeout.
 *                      - @a TIME_INFINITE no timeout.
 *                      .
 * @return              The number of bytes effectively transferred.
 *
 * @api
 */
size_t chIQReadFast(InputQueue *iqp, uint8_t *bp,
                    size_t n, systime_t time) {
  qnotify_t nfy = iqp->q_notify;
  uint8_t *cursor = bp;
  while (n) {
    size_t chunkLen;
    chSysLock();
      while (!(chunkLen = FIFOpeek(&iqp->fifo)))
        if (chWait(iqp, time) != Q_OK) {
          chSysUnlock();
          return cursor - bp;
        }
    chSysUnlock();
    if (chunkLen > n)
      chunkLen = n;
    FIFOreadChunk(&iqp->fifo, cursor, chunkLen);
    if (nfy) {
      chSysLock();
      nfy(iqp);
      chSysUnlock();
    }
    cursor += chunkLen;
    n -= chunkLen;
  }
  return cursor - bp;
}


And the corresponding function for writing:

Code: Select all

/**
 * @brief   Output queue write with timeout when no other writer thread exists
 * @details The function writes data from a buffer to an output queue. The
 *          operation completes when the specified amount of data has been
 *          transferred or after the specified timeout or if the queue has
 *          been reset.
 * @note    The function is not atomic, if you need atomicity it is suggested
 *          to use a semaphore or a mutex for mutual exclusion.
 *
 * @param[in] oqp       pointer to an @p OutputQueue structure
 * @param[out] bp       pointer to the data buffer
 * @param[in] n         the maximum amount of data to be transferred, the
 *                      value 0 is reserved
 * @param[in] time      the number of ticks before the operation timeouts,
 *                      the following special values are allowed:
 *                      - @a TIME_IMMEDIATE immediate timeout.
 *                      - @a TIME_INFINITE no timeout.
 *                      .
 * @return              The number of bytes effectively transferred.
 *
 * @api
 */
size_t chOQWriteFast(OutputQueue *oqp, const uint8_t *bp,
                     size_t n, systime_t time) {
  qnotify_t nfy = oqp->q_notify;
  const uint8_t *cursor = bp;
  while (n) {
    size_t chunkLen;
    chSysLock();
      while (!(chunkLen = FIFOpoke(&oqp->fifo)))
        if (chWait(oqp, time) != Q_OK) {
          chSysUnlock();
          return cursor - bp;
        }
    chSysUnlock();
    if (chunkLen > n)
      chunkLen = n;
    FIFOwriteChunk(&oqp->fifo, cursor, chunkLen);
    if (nfy) {
      chSysLock();
      nfy(oqp);
      chSysUnlock();
    }
    cursor += chunkLen;
    n -= chunkLen;
  }
  return cursor - bp;
}
#endif  /* CH_USE_QUEUES */
- brent

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: GenericQueue with counter?

Postby Giovanni » Wed Nov 29, 2017 9:34 am

Hi,

Block read and writes have been introduced in trunk code already. Is it different from what you proposed?

Giovanni


Return to “Development and Feedback”

Who is online

Users browsing this forum: No registered users and 26 guests