[PROPOSAL] Job Queues

This forum is dedicated to feedback, discussions about ongoing or future developments, ideas and suggestions regarding the ChibiOS projects are welcome. This forum is NOT for support.
meatball
Posts: 32
Joined: Thu May 19, 2016 4:39 pm
Has thanked: 9 times
Been thanked: 2 times

[PROPOSAL] Job Queues

Postby meatball » Wed Nov 27, 2019 5:34 pm

TLDR:
Job Queues would be a great addition to ChibiOS, allowing threads to "delegate" scheduled jobs to other threads in a buffered way. This would reduce the need for some types of overly-simple dedicated threads, and has potential to become a common ChibiOS "Design Pattern".

Hi All,

I've noticed a number of new features being added to ChibiOS. I noticed the Delegate Threads in particular seem simplify a small family of issues that can make concurrency difficult. I would like to describe, propose and discuss a similar feature which would greatly simplify a number of similar, though broader set of problems that Delegate threads seem to get "close" to solving.

Job Queues, (or jobqs for short) are found in a number of other RTOSs, and even general purpose OSs. They typically allow one thread to delegate function calls to a pool of worker threads via a queue. This is particularly helpful when handling a large number of asynchronous, low priority events with possibility for periodic work.

This approach is perhaps unneccessary in many ChibiOS applications, where it is likely more desirable to minimize the number of threads in the application.

An alternative method to function call delegation I've seen done is the method I would love to see added to ChibiOS.

In this method, a thread can be allocated an associated job-queue, which can have job-queue "jobs" added to by any thread - including the associated thread.

The thread simply runs a job from the jobq if the dedicated jobq event bit is raised. This allows the thread to handle other, unrelated events - unrelated to whatever gets placed on it's job queue.

Where this becomes more interesting, and very useful, is in the calling thread's API, which is able to add jobs to a job queue at a specified target time, so you can set up one-shot, and periodic jobs for another thread to handle. The API could look something like this:

Code: Select all

chJobq_t  my_jobq;
chJobfn_t my_job_fn;
uint32_t  param     = 3;
void *    param_p   = (void*)&param;

// Use PERIOD_MS = 0 to add to job queue once
chJob_t my_job = { my_job_fn, PERIOD_MS };

chJobq_init( &my_jobq );

// This is a non-blocking alternative to Delegate Threads
// The job is added to the target thread's job queue now,
// and the job raises the thread's job queue event bit when the target time arrives
chJobq_addJob( &my_jobq, DELAY_MS, &my_job, param_p );

// Removes the job from the job queue if it is present
// This can cancel a previously scheuled job, or stop of periodic job
chJobq_removeJob( &my_jobq, &my_job );

// Called from the target thread's event handler
// It's probably best if the thread only runs 1 job from the job queue per invocation
// After running the job, this function can check for more jobs in the queue
// If there are more ready to run, this function can send the job queue event flag to this thread again
if( events & JOBQ_EVENT_BIT )
chJobq_runJob( my_jobq );


Further thought about this feature reveals that it is a non-blocking superset of the Delegate thread feature (which I believe blocks the calling thread until execution is completed).

This feature would be very powerful to have, allowing a single thread to delegate jobs to other threads, but also when to do so, and in a queued manner - preventing lost jobs.

Image starting a simple LED blinking feature with this method. We may see a reduced occurrance of "blinker threads", dedicated to a overly-simple task that doesn't typically have hard real-time deadlines to meet. It could added to a job queue to simplify the API, and reduce the memory used by yet another thread.

For another example, you could make simple, high priority input/output threads that handle jobs sent by ISRs. Similar to DSP-BIOS concept of "Software ISRS".

For example, making a basic button debouncer with this method could be as simple as adding delayed one-shot jobs to job queues from inside the EXTI ISR, allowing threads to wake up later to check on the bounce state of the GPIO. Since these jobs are all queued, one thread could handle many EXTI interrupts/bounces on different EXTI lines this way. This simplifies your applcation business logic by allowing the "software interrupt" to send debounced button events to the interested threads in the application.

I believe this could be done in ChibiOS, since a number of the needed mechanisms already exist.

It might be a combination of the timed events and mailboxes. After trying to do this with the VTs as they are written, the VT has to buffer the future events for ALL jobs on ALL job queues across the applcation - which is not very efficient for the kernel. Additionally, I have to define an interrupt-context function that wakes up the target thread for every job I define. It would be nice to have a kernel-level mechanism that allows a thread to track it's own scheduled jobs once the job queue is added to once.

Please discuss with feedback, and problems you might notice. I think this is a cool feature, but I think it needs to be implemented carefully.

TLDR:
Job Queues would be a great addition to ChibiOS, allowing threads to "delegate" scheduled jobs to other threads in a buffered way. This reduce the need for dedicated threads, and has potential to become a common ChibiOS "Design Pattern".

Thanks!

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: [PROPOSAL] Job Queues

Postby Giovanni » Wed Nov 27, 2019 6:43 pm

Hi,

Wouldn't a guarded pool of threads be very close to this? starting a thread from a pool is not very different from waking up a thread.

I will give it a try because I am interested in extending OSLIB with this kind of things.

Giovanni

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: [PROPOSAL] Job Queues

Postby Giovanni » Thu Nov 28, 2019 1:16 pm

Hi,

This should be it, built on top of pools and mailboxes:

Code: Select all

/*
    ChibiOS - Copyright (C) 2006..2018 Giovanni Di Sirio.

    This file is part of ChibiOS.

    ChibiOS is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    ChibiOS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

/**
 * @file    chjobs.h
 * @brief   Jobs Queues structures and macros.
 * @details This module implements queues of generic jobs to be delegated
 *          asynchronously to a pool of dedicated threads.
 *          Operations defined for Jobs Queues
 *          - <b>Get</b>: An job object is taken from the pool of the
 *            available jobs.
 *          - <b>Post</b>: A job is posted to the queue, it will be
 *            returned to the pool after execution.
 *          .
 *
 * @addtogroup oslib_jobs_queues
 * @{
 */

#ifndef CHJOBS_H
#define CHJOBS_H

#if (CH_CFG_USE_JOBS == TRUE) || defined(__DOXYGEN__)

/*===========================================================================*/
/* Module constants.                                                         */
/*===========================================================================*/

/*===========================================================================*/
/* Module pre-compile time settings.                                         */
/*===========================================================================*/

/*===========================================================================*/
/* Derived constants and error checks.                                       */
/*===========================================================================*/

#if CH_CFG_USE_MEMPOOLS == FALSE
#error "CH_CFG_USE_JOBS requires CH_CFG_USE_MEMPOOLS"
#endif

#if CH_CFG_USE_SEMAPHORES == FALSE
#error "CH_CFG_USE_JOBS requires CH_CFG_USE_SEMAPHORES"
#endif

#if CH_CFG_USE_MAILBOXES == FALSE
#error "CH_CFG_USE_JOBS requires CH_CFG_USE_MAILBOXES"
#endif

/*===========================================================================*/
/* Module data structures and types.                                         */
/*===========================================================================*/

/**
 * @brief   Type of a jobs queue.
 */
typedef struct ch_jobs_queue {
  /**
   * @brief   Pool of the free jobs.
   */
  guarded_memory_pool_t     free;
  /**
   * @brief   Mailbox of the sent jobs.
   */
  mailbox_t                 mbx;
} jobs_queue_t;

/**
 * @brief   Type of a job function.
 */
typedef void (*job_function_t)(void *arg);

/**
 * @brief   Type of a job descriptor.
 */
typedef struct ch_job_descriptor {
  /**
   * @brief   Job function.
   */
  job_function_t            jobfunc;
  /**
   * @brief   Argument to be passed to the job function.
   */
  void                      *jobarg;
} job_descriptor_t;

/*===========================================================================*/
/* Module macros.                                                            */
/*===========================================================================*/

/*===========================================================================*/
/* External declarations.                                                    */
/*===========================================================================*/

#ifdef __cplusplus
extern "C" {
#endif

#ifdef __cplusplus
}
#endif

/*===========================================================================*/
/* Module inline functions.                                                  */
/*===========================================================================*/

/**
 * @brief   Initializes a jobs queue object.
 *
 * @param[out] jqp      pointer to a @p jobs_queue_t structure
 * @param[in] jobsn     number of jobs available
 * @param[in] jobsbuf   pointer to the buffer of jobs, it must be able
 *                      to hold @p jobsn @p job_descriptor_t structures
 * @param[in] msgbuf    pointer to the buffer of messages, it must be able
 *                      to hold @p jobsn @p msg_t messages
 *
 * @init
 */
static inline void chJobsObjectInit(jobs_queue_t *jqp,
                                    size_t jobsn,
                                    job_descriptor_t *jobsbuf,
                                    msg_t *msgbuf) {

  chDbgCheck((jobsn > 0U) && (jobsbuf != NULL) && (msgbuf != NULL));

  chGuardedPoolObjectInit(&jqp->free, sizeof (job_descriptor_t));
  chGuardedPoolLoadArray(&jqp->free, (void *)jobsbuf, jobsn);
  chMBObjectInit(&jqp->mbx, msgbuf, jobsn);
}

/**
 * @brief   Allocates a free job object.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @return              The pointer to the allocated job object.
 * @retval NULL         if a job object is not immediately available.
 *
 * @iclass
 */
static inline job_descriptor_t *chGetTakeI(jobs_queue_t *jqp) {

  return chGuardedPoolAllocI(&jqp->free);
}

/**
 * @brief   Allocates a free job object.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] timeout   the number of ticks before the operation timeouts,
 *                      the following special values are allowed:
 *                      - @a TIME_IMMEDIATE immediate timeout.
 *                      - @a TIME_INFINITE no timeout.
 *                      .
 * @return              The pointer to the allocated job object.
 * @retval NULL         if a job object is not available within the specified
 *                      timeout.
 *
 * @sclass
 */
static inline job_descriptor_t *chJobsGetTimeoutS(jobs_queue_t *jqp,
                                                  sysinterval_t timeout) {

  return chGuardedPoolAllocTimeoutS(&jqp->free, timeout);
}

/**
 * @brief   Allocates a free job object.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] timeout   the number of ticks before the operation timeouts,
 *                      the following special values are allowed:
 *                      - @a TIME_IMMEDIATE immediate timeout.
 *                      - @a TIME_INFINITE no timeout.
 *                      .
 * @return              The pointer to the allocated job object.
 * @retval NULL         if a job object is not available within the specified
 *                      timeout.
 *
 * @api
 */
static inline job_descriptor_t *chJobsGetTimeout(jobs_queue_t *jqp,
                                                 sysinterval_t timeout) {

  return chGuardedPoolAllocTimeout(&jqp->free, timeout);
}

/**
 * @brief   Posts a job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @iclass
 */
static inline void chJobsPostI(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostI(&jqp->mbx, (msg_t)jp);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Posts a job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @sclass
 */
static inline void chJobsPostS(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostTimeoutS(&jqp->mbx, (msg_t)jp, TIME_IMMEDIATE);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Posts a job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @api
 */
static inline void chJobsPost(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostTimeout(&jqp->mbx, (msg_t)jp, TIME_IMMEDIATE);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Posts an high priority job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @iclass
 */
static inline void chJobsPostAheadI(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostAheadI(&jqp->mbx, (msg_t)jp);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Posts an high priority job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @sclass
 */
static inline void chJobsPostAheadS(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostAheadTimeoutS(&jqp->mbx, (msg_t)jp, TIME_IMMEDIATE);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Posts an high priority job object.
 * @note    By design the object can be always immediately posted.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @param[in] jp        pointer to the job object to be posted
 *
 * @api
 */
static inline void chJobsPostAhead(jobs_queue_t *jqp, job_descriptor_t *jp) {
  msg_t msg;

  msg = chMBPostAheadTimeout(&jqp->mbx, (msg_t)jp, TIME_IMMEDIATE);
  chDbgAssert(msg == MSG_OK, "post failed");
}

/**
 * @brief   Waits for a job then executes it.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @return              The function outcome.
 * @retval MSG_OK       if a job has been executed.
 * @retval MSG_RESET    if the internal mailbox has been reset.
 */
static inline msg_t chJobsDispatch(jobs_queue_t *jqp) {
  msg_t msg, jmsg;

  msg = chMBFetchTimeout(&jqp->mbx, &jmsg, TIME_INFINITE);
  if (msg == MSG_OK) {
    job_descriptor_t *jp = (job_descriptor_t *)jmsg;

    /* Invoking the job function.*/
    jp->jobfunc(jp->jobarg);

    /* Returning the job descriptor object.*/
    chGuardedPoolFree(&jqp->free, (void *)jp);
  }

  return msg;
}

/**
 * @brief   Waits for a job then executes it.
 *
 * @param[in] jqp       pointer to a @p jobs_queue_t structure
 * @return              The function outcome.
 * @retval MSG_OK       if a job has been executed.
 * @retval MSG_TIMEOUT  if a timeout occurred.
 * @retval MSG_RESET    if the internal mailbox has been reset.
 */
static inline msg_t chJobsDispatchTimeout(jobs_queue_t *jqp,
                                          sysinterval_t timeout) {
  msg_t msg, jmsg;

  msg = chMBFetchTimeout(&jqp->mbx, &jmsg, timeout);
  if (msg == MSG_OK) {
    job_descriptor_t *jp = (job_descriptor_t *)jmsg;

    /* Invoking the job function.*/
    jp->jobfunc(jp->jobarg);

    /* Returning the job descriptor object.*/
    chGuardedPoolFree(&jqp->free, (void *)jp);
  }

  return msg;
}

#endif /* CH_CFG_USE_JOBS == TRUE */

#endif /* CHJOBS_H */

/** @} */


Giovanni

meatball
Posts: 32
Joined: Thu May 19, 2016 4:39 pm
Has thanked: 9 times
Been thanked: 2 times

Re: [PROPOSAL] Job Queues

Postby meatball » Fri Nov 29, 2019 2:32 am

Hi Giovanni,

This implementation looks interesting. Thank you for working with this idea - I'm trying to understand this implementation well so I can write some tests.

A couple questions:

1) Is there a reason you would prefer to use a pool of threads, instead of sending to a specified queue/owning-thread that blocks on event bits?

I'm interested in reducing the number of threads / memory needed by an application to dispatch these sorts of tasks.

For instance, it would be good to send to an already-existing thread because of it's priority or because of the variables available in it's scope. Dispatching some tasks in ways that increase the concurrency in an application might be good to allow.

2) Is there a way you would propose to use this implementation for periodic, delayed, or one-shot dispatches? Would you suggest VTs in this case? Is there a way to allow the kernel the wake the thread that owns the queue? Blocking on the queue itself, instead of an event complicates the dispatch for the thread that blocks on standard event flags.

What do you think?

Thanks Giovanni

-meatball

User avatar
Giovanni
Site Admin
Posts: 14444
Joined: Wed May 27, 2009 8:48 am
Location: Salerno, Italy
Has thanked: 1074 times
Been thanked: 921 times
Contact:

Re: [PROPOSAL] Job Queues

Postby Giovanni » Fri Nov 29, 2019 8:25 am

Hi,

In the current implementation there are a pool of competing threads, as soon a job becomes available the first available thread will grab it. Note that all threads in the pool are "already existing", you create those at beginning and then will stay in their "dispatch" loop, similarly to how it is done in delegates.

You can also create multiple queues, each one with its own pool of threads ,if you need groups with different properties (priority, stack size, other).

In RT7 threads are assigned to physical cores, you can have multiple cores participating to the same pool, this mean that jobs will be executed on the first available core.

Giovanni


Return to “Development and Feedback”

Who is online

Users browsing this forum: No registered users and 17 guests