Bernard Pietraga

Scaling Deployments With Go Kubernetes Client

Golang Kubernetes API - Gopher

So you want to auto scale your deployments with Golang?

I’ve written this post as I haven’t found any comprehensive guide on doing simple scaling from within Golang app. It utilizes Kubernetes client-go and Kubernetes apimachinery. Note that official documentation is great and can provide information that enable everyone to get up to speed easily.
Image notes: The Gopher mascot used in post image is created by Renee French and shared on Creative Commons Attribution 3.0 License. I’ve combined it with burger from Katerina Limpitsouni undraw.co


It is time to start. You have an application running in lovely K8s, which performs some tasks, and you want to autoscale it based on some really custom internal metric. It is not CPU, requests, or any other thing with which can be done using custom metric or other approach. This article is written as a weird solution to a rare problem and is done using Golang. Most of the time deployments, and other bits of Kubernetes infrastructure can be handled using out of the box components. In case of deployment pods it can be Horizontal Pod Autoscaler which with help of vast metrics can achieve the scaling result we need for our system. So think twice about using it.

This solution also might be helpful in cases when the conditional scaling is during selected time, for example during Superbowl adverts, when you know, you will get a lot of the traffic.

So lets asume we have great startup idea. Application which handles the bread grill toasters, which will be used in the restaurant chains all over the world. There is service which hold a state about amount of oders waiting to be grilled and comunicates with the shiny IOT toasters about heating metrics. There is one pod per toaster. Everyday the restaurant starts there are only 3 toasters running, which means only 3 pods defined in deployment. When the orders ramp up, the restaurant manager calls you and asks if there is possbility of heating other cold toasters laying in the room.

This is an example, which is insecure, using plain requests via http, consider secure communication before using it.

Here we go with our sandwich journey

So lets assume that the serivce responsible for managing toasters for the sake of the convinience returns json response with just one field representing current number of orders which is "orders": . It will be basis for our logic of scaling up and down.

We will create logic that if the number of orders to be fullfiled per toaster (pod) is greater than 5 than the pods should be scaled up. If it is lower than 5 than we should scale down, minimally to 3 pods. Everything shoudl happen in 10 second intervals.

RBAC rules needed to do the job

Skip this section if you are only interested in Golang code. RBAC rules are necessary to perform changes on our deployment. This example above is permissive, so adjust accordingly for security. The goal is to enable our simple toaster app to access data about deployments – replicas, which represent amount of pods and change this amount on the fly.

Lets create rbac.yaml file.

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: simple-deployments-manager
rules:
- apiGroups: ["extensions", "apps"]
  resources: ["deployments", "pods"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
# This role binding allows "dave" to read secrets in the "development" namespace.
# You need to already have a ClusterRole named "deployments".
kind: RoleBinding
metadata:
  name: simple-deployments-manager
  #
  # The namespace of the ClusterRoleBinding determines where the permissions are granted.
  # This only grants permissions within the "development" namespace.
  namespace: default
subjects:
- kind: User
  name: system:serviceaccount:default:default # Name is case sensitive
  apiGroup: rbac.authorization.k8s.io
- kind: Group
  name: manager # Name is case sensitive
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: simple-deployments-manager
  apiGroup: rbac.authorization.k8s.io

Deployment – we also need to put our app somewhere

Also skip this section if you are only interested in Golang code. We need to place our app somewhere and to do this, here is an example of file content. Important note is that we pass the environmental variable which is the address of the service providing us with order data in our case.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-scaler
  labels:
    app: simple-scaler
spec:
  replicas: 1
  selector:
    matchLabels:
      app: simple-scaler
  template:
    metadata:
      labels:
        app: simple-scaler
    spec:
      containers:
      - name: simple-scaler
        image: simple-scaler
        imagePullPolicy: Never
        env:
        - name: MANAGED_SERVICE_HOST
          value: "managed-service:8080"

Docker image for our main.go

To be able to successfully deploy the app to cluster we need to put it into docker. Here is the simple content which runs the app as non-root.

# Build the simplescaler binary
FROM golang:1.16 as builder

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY main.go main.go

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o simplescaler main.go

# Use distroless as minimal base image to package the simplescaler binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/simplescaler .
USER 65532:65532

ENTRYPOINT ["/simplescaler"]

Logic – the brain of restaurant scaling – Golang code

Finally, the Golang code. So lets look at the functions we need to get stuff done.

Scroll down if you are only interested in the complete file!

First of all lets create file main.go which we will later run in a separate pod in our greatest cluster.

Now lets start with components we need to map for our app to work. Let’s start with the struct which will be representing the response from the service providing us with a number of orders.

// OrdersResponse is struct representing JSON response from our managed service
// http://managed-service:8080/orders
type OrdersResponse struct {
    Orders int32 "json:\"count,omitempty\"" // Backticks are issue for markdown parser
}

Now lets jump further and map other parts needed. On the side of the Kubernetes we will need the types and interfaces which will represent Kubernetes objects.

// ManagedDeployment struct represents the state used to preform replica set
// scaling.
type ManagedDeployment struct {
    // k8s client of the kuberentes deployment
    Client appsv1.DeploymentInterface

    // System metrics from managed-service from 10 second timespan
    ReplicaCount int32

    // Fetched whole deployment state
    State *v1.Deployment
}

// ManagedDeploymentInterface is the interface exposing methods
type ManagedDeploymentInterface interface {
    Get() error
    UpdateReplicas() error
}

Small factory for the ManagedDeployment will be nice:

// NewManagedDeploymentManager is factory used to create ManagedDeployment
func NewManagedDeploymentManager(deploymentsClient appsv1.DeploymentInterface) *ManagedDeployment {
    return &ManagedDeployment{Client: deploymentsClient}
}

Now the interesting bit. For this we will use some Golang standard libraries and the Kubernetes client-go and Kubernetes apimachinery.

We will need some imports.

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "strings"
    "time"

    v1 "k8s.io/api/apps/v1"
    apiv1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/retry"
)

Let’s implement a 2 functions from our ManagedDeploymentInterface:

  • Get()
  • UpdateReplicas()

First we will create simple pointer helper, as the client library requires pointers to 32 bit integers.

// int32Ptr is used to create pointer neecessary for the replica count
func int32Ptr(i int32) *int32 { return &i }

Get() – will update the attributes of our ManagedDeployment with fresh data from Deployment – including what is important for us State and ReplicaCount – number of pods.

// Get is used to fetch new state und update the ManagedDeployment.
func (c *ManagedDeployment) Get() error {
    result, getErr := c.Client.Get(context.TODO(),
        "managed-deployment",
        metav1.GetOptions{})
    if getErr != nil {
        return getErr
    }
    replicaCount := *result.Spec.Replicas
    c.State = result
    c.ReplicaCount = replicaCount
    return nil
}

UpdateReplicas(count int32) – will update the ManagedDeployment with count representing number of pods. The 32-bit integer is used as it is official client requirement.

// UpdateReplicas is used to change the number of deployemnt replicas based on
// provided count number
func (c *ManagedDeployment) UpdateReplicas(count int32) error {
    c.State.Spec.Replicas = int32Ptr(count)
    _, updateErr := c.Client.Update(context.TODO(),
        c.State,
        metav1.UpdateOptions{})
    if updateErr != nil {
        return updateErr
    }
    c.ReplicaCount = count
    return nil
}

Now it is time to wrap it up into a method which will query the managed host and return the simple pod count.

// GetManagedOrdersCount asks managed-service api to get running processes
// count. Serializes response body to json and returns int containging number
func GetManagedOrdersCount() (int32, error) {
    url := "http://" + ManagedHost + "/orders"
    r := strings.NewReader("")
    req, clientErr := http.NewRequest("GET", url, r)
    if clientErr != nil {
        return int32(0),
            fmt.Errorf("Prepare request : %v", clientErr)
    }
    req.Header.Set("Content-Type", "application/json")

    client := &http.Client{}

    resp, reqErr := client.Do(req)
    if reqErr != nil {
        return int32(0),
            fmt.Errorf("Failed to query the process count : %v", reqErr)
    }
    defer resp.Body.Close()

    // Decode the data
    var cResp OrdersResponse
    if jsonErr := json.NewDecoder(resp.Body).Decode(&cResp); jsonErr != nil {
        return int32(0), fmt.Errorf("Error parsing process count json : %v", jsonErr)
    }
    return cResp.Orders, nil
}

To connect to the cluster lets setup 2 client helpers, which will aid us if we use running locally (outside docker) or in the cluster.


// clientSetup returns appropriate clientset with config either from ./kube
// or cluster api configuration
func clientSetup() (*kubernetes.Clientset, error) {
    if runningInDocker() {
        return setupDockerClusterClient()
    } else {
        return setupLocalClient()
    }
}

// setupLocalClient from minikube ./kube/config
func setupLocalClient() (*kubernetes.Clientset, error) {
    var ns string
    flag.StringVar(&ns, "namespace", "", "namespace")

    // Bootstrap k8s configuration from local   Kubernetes config file
    kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
    log.Println("Using kubeconfig file: ", kubeconfig)
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        return &kubernetes.Clientset{}, err
    }

    // Create an rest client not targeting specific API version
    clientset, err := kubernetes.NewForConfig(config)
    return clientset, err
}

// setupDockerClusterClient from rest cluster api configuration.
func setupDockerClusterClient() (*kubernetes.Clientset, error) {
    log.Println("Running inside container")
    // creates the in-cluster config
    config, err := rest.InClusterConfig()
    if err != nil {
        return &kubernetes.Clientset{}, err
    }
    // creates the clientset
    clientset, err := kubernetes.NewForConfig(config)
    return clientset, err
}

Now the logic part, the main function which will run recursively. The important part is the usage of ticker, which rather than sleep will keep real 10 second intervals.

func main() {
    // ManagedHost is URI of the managed-service running on cluster
    var ManagedHost string = os.Getenv("MANAGED_SERVICE_HOST")

    // to prevent tick shifting rather than using time.Sleep I've decided to go
    // with time.Tick which of course this also has some downsides. Channels
    // here would add safety. In extreme case there might be race condition.
    tick := time.Tick(10 * time.Second)
    tickCount := 0

    clientset, err := clientSetup()
    if err != nil {
        log.Fatal(err)
    }

    deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
    ManagedDeployment := NewManagedDeploymentManager(deploymentsClient)

    // Retrieve the initial version of Deployment
    getErr := ManagedDeployment.Get()
    if getErr != nil {
        log.Fatal(fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr))
    }

    if ManagedDeployment.ReplicaCount <= 0 {
        log.Fatal(fmt.Errorf("Replica count is equal %d, this means the service is still deploying or there is an issue", ManagedDeployment.ReplicaCount))
    }

    // Get process count from managed-service
    oldOrdersCount, err := GetManagedOrdersCount()
    if err != nil {
        log.Fatal(err)
    }

    // in 10 second intervals update orders-deployment replicas if needed.
    for range tick {
        // Update replica count
        retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
            tickCount++ // just for logging puproposes

            // Retrieve the latest version of Deployment before attempting to
            // run update. RetryOnConflict uses exponential backoff to avoid
            // exhausting the apiserver
            getErr := ManagedDeployment.Get()
            if getErr != nil {
                return fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr)
            }

            // Get current process count from managed-service
            currentOrdersCount, err := GetManagedOrdersCount()
            if err != nil {
                return err
            }

            // Find how much processes where created in 10 seconds
            ordersIn10Seconds := currentOrdersCount - oldOrdersCount

            // Count no processes for instance (please take to account int math)
            ordersPerInstance := ordersIn10Seconds / ManagedDeployment.ReplicaCount

            // Overwrite the old process count for new comparisons in next tick
            // it is not needed at this state.
            oldOrdersCount = currentOrdersCount

            if ordersPerInstance >= 5 && ManagedDeployment.ReplicaCount <= 3 {
                // Increment the Replica Count, note: won't use ++ as it can
                // impact second conditianal comparsion.
                oneMore := ManagedDeployment.ReplicaCount + 1
                updateErr := ManagedDeployment.UpdateReplicas(oneMore)
                if updateErr == nil {
                    fmt.Println("   -> Replica count changed to",
                        oneMore)
                }
                return updateErr
            } else if ordersPerInstance < 5 && ManagedDeployment.ReplicaCount > 3 {
                oneLess := ManagedDeployment.ReplicaCount - 1
                updateErr := ManagedDeployment.UpdateReplicas(oneLess)
                if updateErr == nil {
                    fmt.Println("   -> Replica count changed to",
                        oneLess)
                }
                return updateErr
            }

            // Overwrite the old process count for new comparisons in next tick
            oldOrdersCount = currentOrdersCount
            return nil
        })

        if retryErr != nil {
            panic(fmt.Errorf("Update failed: %v", retryErr))
        }
    }
    // end of loop
}

That’s our core logic, responsible for handling deployment scaling back and forth.

Final main.go file

Putting it all together, here is the complete main.go file.

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "strings"
    "time"

    v1 "k8s.io/api/apps/v1"
    apiv1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/retry"
)

// OrdersResponse is struct representing JSON response from:
// http://managed-service:8080/orders
type OrdersResponse struct {
    Orders int32 "json:\"count,omitempty\"" // Backticks are issue for markdown parser
}

// ManagedDeployment struct represents the state used to preform replica set
// scaling.
type ManagedDeployment struct {
    // k8s client of the kuberentes deployment
    Client appsv1.DeploymentInterface

    // System metrics from managed-service from 10 second timespan
    ReplicaCount int32

    // Fetched whole deployment state
    State *v1.Deployment
}

// ManagedDeploymentInterface is the interface exposing methods
type ManagedDeploymentInterface interface {
    Get() error
    UpdateReplicas() error
}

// NewManagedDeploymentManager is factory used to create ManagedDeployment
func NewManagedDeploymentManager(deploymentsClient appsv1.DeploymentInterface) *ManagedDeployment {
    return &ManagedDeployment{Client: deploymentsClient}
}

func main() {
    // ManagedHost is URI of the managed-service running on cluster
    var ManagedHost string = os.Getenv("MANAGED_SERVICE_HOST")

    // to prevent tick shifting rather than using time.Sleep I've decided to go
    // with time.Tick which of course this also has some downsides. Channels
    // here would add safety. In extreme case there might be race condition.
    tick := time.Tick(10 * time.Second)
    tickCount := 0

    // Setup client, might be better to use interfaces here.
    clientset, err := clientSetup()
    if err != nil {
        log.Fatal(err)
    }

    deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
    ManagedDeployment := NewManagedDeploymentManager(deploymentsClient)

    // Retrieve the latest version of Deployment
    getErr := ManagedDeployment.Get()
    if getErr != nil {
        log.Fatal(fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr))
    }

    if ManagedDeployment.ReplicaCount <= 0 {
        log.Fatal(fmt.Errorf("Replica count is equal %d, this means the service is still deploying or there is an issue", ManagedDeployment.ReplicaCount))
    }

    // Get process count from managed-service
    oldOrdersCount, err := GetManagedOrdersCount()
    if err != nil {
        log.Fatal(err)
    }

    // in 10 second intervals update orders-deployment replicas if needed.
    for range tick {
        // Update replica count
        retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
            tickCount++ // just for logging puproposes

            // Retrieve the latest version of Deployment before attempting to
            // run update. RetryOnConflict uses exponential backoff to avoid
            // exhausting the apiserver
            getErr := ManagedDeployment.Get()
            if getErr != nil {
                return fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr)
            }

            // Get current process count from managed-service
            currentOrdersCount, err := GetManagedOrdersCount()
            if err != nil {
                return err
            }

            // Find how much processes where created in 10 seconds
            ordersIn10Seconds := currentOrdersCount - oldOrdersCount

            // Count no processes for instance (please take to account int math)
            ordersPerInstance := ordersIn10Seconds / ManagedDeployment.ReplicaCount

            // Overwrite the old process count for new comparisons in next tick
            // it is not needed at this state.
            oldOrdersCount = currentOrdersCount

            if ordersPerInstance >= 5 && ManagedDeployment.ReplicaCount <= 3 {
                // Increment the Replica Count, note: won't use ++ as it can
                // impact second conditianal comparsion.
                oneMore := ManagedDeployment.ReplicaCount + 1
                updateErr := ManagedDeployment.UpdateReplicas(oneMore)
                if updateErr == nil {
                    fmt.Println("   -> Replica count changed to",
                        oneMore)
                }
                return updateErr
            } else if ordersPerInstance < 5 && ManagedDeployment.ReplicaCount > 3 {
                oneLess := ManagedDeployment.ReplicaCount - 1
                updateErr := ManagedDeployment.UpdateReplicas(oneLess)
                if updateErr == nil {
                    fmt.Println("   -> Replica count changed to",
                        oneLess)
                }
                return updateErr
            }

            // Overwrite the old process count for new comparisons in next tick
            oldOrdersCount = currentOrdersCount
            return nil
        })

        if retryErr != nil {
            panic(fmt.Errorf("Update failed: %v", retryErr))
        }
    }
    // end of loop
}

// Get is used to fetch new state und update the ManagedDeployment.
func (c *ManagedDeployment) Get() error {
    result, getErr := c.Client.Get(context.TODO(),
        "managed-deployment",
        metav1.GetOptions{})
    if getErr != nil {
        return getErr
    }
    replicaCount := *result.Spec.Replicas
    c.State = result
    c.ReplicaCount = replicaCount
    return nil
}

// UpdateReplicas is used to change the number of deployemnt replicas based on
// provided count number
func (c *ManagedDeployment) UpdateReplicas(count int32) error {
    c.State.Spec.Replicas = int32Ptr(count)
    _, updateErr := c.Client.Update(context.TODO(),
        c.State,
        metav1.UpdateOptions{})
    if updateErr != nil {
        return updateErr
    }
    c.ReplicaCount = count
    fmt.Println("REPLICA COUNT UPDATE", c.ReplicaCount)
    return nil
}

// GetManagedOrdersCount asks managed-service api to get running processes
// count. Serializes response body to json and returns int containging number
func GetManagedOrdersCount() (int32, error) {
    url := "http://" + ManagedHost + "/orders"
    r := strings.NewReader("")
    req, clientErr := http.NewRequest("GET", url, r)
    if clientErr != nil {
        return int32(0),
            fmt.Errorf("Prepare request : %v", clientErr)
    }
    req.Header.Set("Content-Type", "application/json")

    client := &http.Client{}

    resp, reqErr := client.Do(req)
    if reqErr != nil {
        return int32(0),
            fmt.Errorf("Failed to query the process count : %v", reqErr)
    }
    defer resp.Body.Close()

    // Decode the data
    var cResp OrdersResponse
    if jsonErr := json.NewDecoder(resp.Body).Decode(&cResp); jsonErr != nil {
        return int32(0), fmt.Errorf("Error parsing process count json : %v", jsonErr)
    }
    return cResp.Orders, nil
}

// int32Ptr is used to create pointer neecessary for the replica count
func int32Ptr(i int32) *int32 { return &i }

// runningInDocker check if there is .dockerenv present, meaning the process is
// running in container. I'm aware that this is not deterministic check for all
// of the setups, but it work fine for this example minikube app.
func runningInDocker() bool {
    if _, err := os.Stat("/.dockerenv"); err == nil {
        return true
    } else if os.IsNotExist(err) {
        return false
    } else {
        // Schrodinger cat, this might be container or not :)
        return false
    }
}

// clientSetup returns appropriate clientset with config either from ./kube
// or cluster api configuration
func clientSetup() (*kubernetes.Clientset, error) {
    if runningInDocker() {
        return setupDockerClusterClient()
    } else {
        return setupLocalClient()
    }
}

// setupLocalClient from minikube ./kube/config
func setupLocalClient() (*kubernetes.Clientset, error) {
    var ns string
    flag.StringVar(&ns, "namespace", "", "namespace")

    // Bootstrap k8s configuration from local   Kubernetes config file
    kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
    log.Println("Using kubeconfig file: ", kubeconfig)
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        return &kubernetes.Clientset{}, err
    }

    // Create an rest client not targeting specific API version
    clientset, err := kubernetes.NewForConfig(config)
    return clientset, err
}

// setupDockerClusterClient from rest cluster api configuration.
func setupDockerClusterClient() (*kubernetes.Clientset, error) {
    log.Println("Running inside container")
    // creates the in-cluster config
    config, err := rest.InClusterConfig()
    if err != nil {
        return &kubernetes.Clientset{}, err
    }
    // creates the clientset
    clientset, err := kubernetes.NewForConfig(config)
    return clientset, err
}