Job Queue Patterns in Kubernetes

- Hariharan

In one of my recent projects, I had to run some long-running jobs based on the user request. So, I started reading about job queues and how they are implemented. Here I have collected some of my findings and experiments.

Project Setup

I built a simple worker node in Go that reads from a queue on Redis.

 1if worker {
 2    host, _ := os.Hostname()
 3    log.Println("Starting worker", host)
 4    jobsChan := queue.Subscribe(context.Background(), exitIfEmpty)
 5    for {
 6        job, ok := <-jobsChan
 7        if !ok {
 8            return
 9        }
10        time.Sleep(5 * time.Second)
11        log.Println(fmt.Sprintf("Processed job %s by worker %s", job, host))
12    }
13}

The exitIfEmpty can be set if the worker should exit if the job queue is empty. This would be useful later.
There is also an API endpoint that pushes jobs into the queue.

1pubJobqueue := make(chan string)
2go queue.Publish(context.Background(), pubJobqueue)
3http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
4    pubJobqueue <- uuid.New().String()
5    w.WriteHeader(http.StatusAccepted)
6})

Development Setup

I used kind to create a local development cluster.

go install sigs.k8s.io/[email protected] && kind create cluster

Approach #1

The first approach was quite simple, just have a deployment of the worker node with a few replicas. Since the worker nodes are completely stateless, this can be easily scaled.

 1apiVersion: apps/v1
 2kind: Deployment
 3metadata:
 4  name: worker-node
 5spec:
 6  selector:
 7    matchLabels:
 8      app: worker-node
 9  template:
10    metadata:
11      labels:
12        app: worker-node
13    spec:
14      containers:
15      - name: worker-node
16        image: worker:1
17        env:
18        - name: WORKER
19          value: "1"
20        - name: REDIS_CONNECTION_URL
21          value: "redis://redis-svc:6379/0"
22        resources:
23          limits:
24            memory: "128Mi"
25            cpu: "500m"
26

I deployed an instance of Redis and the API server as well. Here, you get a pool of worker nodes that are always up and checking the queue for any new jobs. This can work perfectly fine, especially if you get a constant stream of new jobs. But, I wanted something that can scale down or up based on demand.

Approach #2

I decided to use a CronJob to create worker nodes. The workers have an additional logic to exit if the queue is empty.

 1apiVersion: batch/v1
 2kind: CronJob
 3metadata:
 4  name: worker-job
 5spec:
 6  concurrencyPolicy: "Allow"
 7  schedule: "* * * * *"
 8  jobTemplate:
 9    spec:
10      parallelism: 3
11      template:
12        metadata:
13          labels:
14            app: worker-node
15        spec:
16          containers:
17          - name: worker-node
18            image: worker:1
19            env:
20            - name: WORKER
21              value: "1"
22            - name: EXIT
23              value: "1"
24            - name: REDIS_CONNECTION_URL
25              value: "redis://redis-svc:6379/0"
26            resources:
27              limits:
28                memory: "128Mi"
29                cpu: "500m"
30          restartPolicy: "OnFailure"

The CronJob creates 3 worker nodes every minute and they can quickly exit if there is no work. concurrencyPolicy is set to “Allow” so that multiple pods can be created on subsequent triggers if the job queue is still not empty. This can help in cases when there is a sudden spike.

This approach also has some disadvantages. First, there is always an overhead associated with constantly creating new pods. Also, the Kubernetes CronJob is not the most accurate. There could be some delay between the actual start time and the time at which the pods are running. But, if you need to run batch jobs like creating a data archive on user request or maybe even run some bash scripts or playbooks on demand, this could work well. This approach is more suited for cases where the requests are sparse.

Potential Approach #3

Another approach would be to use the horizontal pod autoscaler (HPA). HPA allows scaling based on custom metrics. You can use your own solution for collecting metrics like Prometheus and Kubernetes can use an adapter API to query the metrics. You can also build your own adapter in Go.

Another option is to use keda (Kubernetes Event-driven Autoscaling) which is built on top of the custom-metrics-apiserver project. It is built specifically to scale containers based on events. Keda already has a lot of prebuilt scalers that you can use, like for eg:- Redis Cluster List which can automatically scale based on the list length.

All the code from this post can be found here.