Tasqueue – A simple, customisable distributed job/worker in Go


Tasqueue is a simple, lightweight distributed job/worker implementation in Go


go get -u


  • tasqueue.Broker is a generic interface to enqueue and consume messages from a single queue. Currently supported brokers are redis and nats-jetstream.
  • tasqueue.Results is a generic interface to store the status and results of jobs. Currently supported result stores are redis and nats-jetstream.
  • tasqueue.Task is a pre-registered job handler. It stores a handler functions which is called to process a job. It also stores callbacks (if set through options), executed during different states of a job.
  • tasqueue.Job represents a unit of work pushed to a queue for consumption. It holds:
    • []byte payload (encoded in any manner, if required)
    • task name used to identify the pre-registed task which will processes the job.


A tasqueue server is the main store that holds the broker and the results interfaces. It also acts as a hub to register tasks.

Server Options

// ServerOpts are curated options to configure a server.
type ServerOpts struct {
        Concurrency uint32 // default: `tasqueue:tasks`
        Queue       string // default : `1`


package main

import (

        redis_broker ""
        redis_results ""

func main() {
        broker := redis_broker.New(redis_broker.Options{
                Addrs:    []string{""},
                Password: "",
                DB:       0,
        results := redis_results.New(redis_results.Options{
                Addrs:    []string{""},
                Password: "",
                DB:       0,

        srv, err := tasqueue.NewServer(broker, results,tasqueue.ServerOpts{Concurrency: 5})
        if err != nil {

Task Options

Task options are callbacks that are executed one a state change.

type TaskOpts struct {
        SuccessCB    func(JobCtx)
        ProcessingCB func(JobCtx)
        RetryingCB   func(JobCtx)
        FailedCB     func(JobCtx)

Registering tasks

A task can be registered by supplying a name, handler and options. Jobs can be processed using a task registered by a particular name. A handler is a function with the signature func([]byte, JobCtx) error. It is the responsibility of the handler to deal with the []byte payload in whatever manner (decode, if required).

package tasks

import (


type SumPayload struct {
        Arg1 int `json:"arg1"`
        Arg2 int `json:"arg2"`

type SumResult struct {
        Result int `json:"result"`

// SumProcessor prints the sum of two integer arguements.
func SumProcessor(b []byte, m tasqueue.JobCtx) error {
        var pl SumPayload
        if err := json.Unmarshal(b, &pl); err != nil {
                return err

        rs, err := json.Marshal(SumResult{Result: pl.Arg1 + pl.Arg2})
        if err != nil {
                return err


        return nil
srv.RegisterTask("add", tasks.SumProcessor, TaskOpts{})

Start server

Start() starts the job consumer and processor. It is a blocking function. It listens for jobs on the queue and spawns processor go routines.


A tasqueue job represents a unit of work pushed onto the queue, that requires processing using a registered Task. It holds a []byte payload, a task name (which will process the payload) and various options.

Job Options

// JobOpts holds the various options available to configure a job.
type JobOpts struct {
        Queue      string // default: `tasqueue:tasks`
        MaxRetries uint32 // default: `1`
        Schedule   string // cron schedule for the job

Creating a job

NewJob returns a job with the supplied payload. It accepts the name of the task, the payload and a list of options.

b, _ := json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4})
job, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {

Enqueuing a job

Once a job is created, it can be enqueued via the server for processing. Calling srv.Enqueue returns a job uuid which can be used to query the status of the job.

uuid, err := srv.Enqueue(ctx, job)
if err != nil {

Getting a job message

To query the details of a job that was enqueued, we can use srv.GetJob. It returns a JobMessage which contains details related to a job.

jobMsg, err := srv.GetJob(ctx, uuid)
if err != nil {

Fields available in a JobMessage (embeds Meta):

// Meta contains fields related to a job. These are updated when a task is consumed.
type Meta struct {
        UUID        string
        Status      string
        Queue       string
        Schedule    string
        MaxRetry    uint32
        Retried     uint32
        PrevErr     string
        ProcessedAt time.Time


JobCtx is passed to handler functions and callbacks. It can be used to view the job's meta information (JobCtx embeds Meta) and also to save arbitrary results for a job using func (c *JobCtx) Save(b []byte) error


A tasqueue group holds multiple jobs and pushes them all simultaneously onto the queue, the Group is considered successful only if all the jobs finish successfully.

Creating a group

NewGroup returns a Group holding the jobs passed.

var group []tasqueue.Job

for i := 0; i < 3; i++ {
        b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
        job, err := tasqueue.NewJob("add", b)
        if err != nil {
        group = append(group, job)

grp, err := tasqueue.NewGroup(group...)
if err != nil {

Enqueuing a group

Once a group is created, it can be enqueued via the server for processing. Calling srv.EnqueueGroup returns a group uuid which can be used to query the status of the group.

groupUUID, err := srv.EnqueueGroup(ctx, grp)
if err != nil {

Getting a group message

To query the details of a group that was enqueued, we can use srv.GetGroup. It returns a GroupMessage which contains details related to a group.

groupMsg, err := srv.GetGroup(ctx, groupUUID)
if err != nil {

Fields available in a GroupMessage (embeds GroupMeta):

// GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupMeta struct {
        UUID   string
        Status string
        // JobStatus is a map of individual job uuid -> status
        JobStatus map[string]string


A tasqueue chain holds multiple jobs and pushes them one after the other (after a job succeeds), the Chain is considered successful only if the final job completes successfuly.

Creating a chain

NewChain returns a chain holding the jobs passed in the order.

var chain []tasqueue.Job

for i := 0; i < 3; i++ {
        b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
        task, err := tasqueue.NewJob("add", b)
        if err != nil {
        chain = append(chain, task)

chn, err := tasqueue.NewChain(chain...)
if err != nil {

Enqueuing a chain

Once a chain is created, it can be enqueued via the server for processing. Calling srv.EnqueueChain returns a chain uuid which can be used to query the status of the chain.

chainUUID, err := srv.EnqueueChain(ctx, chn)
if err != nil {

Getting a chain message

To query the details of a chain that was enqueued, we can use srv.GetChain. It returns a ChainMessage which contains details related to a chian.

chainMsg, err := srv.GetChain(ctx, chainUUID)
if err != nil {

Fields available in a ChainMessage (embeds ChainMeta):

// ChainMeta contains fields related to a chain job.
type ChainMeta struct {
        UUID string
        // Status of the overall chain
        Status string
        // UUID of the current job part of chain
        JobUUID string
        // List of UUIDs of completed jobs
        PrevJobs []string


A result is arbitrary []byte data saved by a handler or callback via JobCtx.Save().

Get Result

b, err := srv.GetResult(ctx, jobUUID)
if err != nil {


  • @knadh for the logo & feature suggestions



