Scaling Deployments With Go Kubernetes Client
So you want to auto scale your deployments with Golang?
I’ve written this post as I haven’t found any comprehensive guide on doing simple scaling from within Golang app. It utilizes Kubernetes client-go and Kubernetes apimachinery. Note that official documentation is great and can provide information that enable everyone to get up to speed easily.
Image notes: The Gopher mascot used in post image is created by Renee French and shared on Creative Commons Attribution 3.0 License. I’ve combined it with burger from Katerina Limpitsouni undraw.co
It is time to start. You have an application running in lovely K8s, which performs some tasks, and you want to autoscale it based on some really custom internal metric. It is not CPU, requests, or any other thing with which can be done using custom metric or other approach. This article is written as a weird solution to a rare problem and is done using Golang. Most of the time deployments, and other bits of Kubernetes infrastructure can be handled using out of the box components. In case of deployment pods it can be Horizontal Pod Autoscaler which with help of vast metrics can achieve the scaling result we need for our system. So think twice about using it.
This solution also might be helpful in cases when the conditional scaling is during selected time, for example during Superbowl adverts, when you know, you will get a lot of the traffic.
So lets asume we have great startup idea. Application which handles the bread grill toasters, which will be used in the restaurant chains all over the world. There is service which hold a state about amount of oders waiting to be grilled and comunicates with the shiny IOT toasters about heating metrics. There is one pod per toaster. Everyday the restaurant starts there are only 3 toasters running, which means only 3 pods defined in deployment. When the orders ramp up, the restaurant manager calls you and asks if there is possbility of heating other cold toasters laying in the room.
This is an example, which is insecure, using plain requests via http, consider secure communication before using it.
Here we go with our sandwich journey
So lets assume that the serivce responsible for managing toasters for the sake of the convinience returns json response with just one field representing current number of orders which is "orders":
. It will be basis for our logic of scaling up and down.
We will create logic that if the number of orders to be fullfiled per toaster (pod) is greater than 5 than the pods should be scaled up. If it is lower than 5 than we should scale down, minimally to 3 pods. Everything shoudl happen in 10 second intervals.
RBAC rules needed to do the job
Skip this section if you are only interested in Golang code. RBAC rules are necessary to perform changes on our deployment. This example above is permissive, so adjust accordingly for security. The goal is to enable our simple toaster app to access data about deployments – replicas, which represent amount of pods and change this amount on the fly.
Lets create rbac.yaml
file.
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: simple-deployments-manager
rules:
- apiGroups: ["extensions", "apps"]
resources: ["deployments", "pods"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
# This role binding allows "dave" to read secrets in the "development" namespace.
# You need to already have a ClusterRole named "deployments".
kind: RoleBinding
metadata:
name: simple-deployments-manager
#
# The namespace of the ClusterRoleBinding determines where the permissions are granted.
# This only grants permissions within the "development" namespace.
namespace: default
subjects:
- kind: User
name: system:serviceaccount:default:default # Name is case sensitive
apiGroup: rbac.authorization.k8s.io
- kind: Group
name: manager # Name is case sensitive
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: simple-deployments-manager
apiGroup: rbac.authorization.k8s.io
Deployment – we also need to put our app somewhere
Also skip this section if you are only interested in Golang code. We need to place our app somewhere and to do this, here is an example of file content. Important note is that we pass the environmental variable which is the address of the service providing us with order data in our case.
apiVersion: apps/v1
kind: Deployment
metadata:
name: simple-scaler
labels:
app: simple-scaler
spec:
replicas: 1
selector:
matchLabels:
app: simple-scaler
template:
metadata:
labels:
app: simple-scaler
spec:
containers:
- name: simple-scaler
image: simple-scaler
imagePullPolicy: Never
env:
- name: MANAGED_SERVICE_HOST
value: "managed-service:8080"
Docker image for our main.go
To be able to successfully deploy the app to cluster we need to put it into docker. Here is the simple content which runs the app as non-root.
# Build the simplescaler binary
FROM golang:1.16 as builder
WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download
# Copy the go source
COPY main.go main.go
# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o simplescaler main.go
# Use distroless as minimal base image to package the simplescaler binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/simplescaler .
USER 65532:65532
ENTRYPOINT ["/simplescaler"]
Logic – the brain of restaurant scaling – Golang code
Finally, the Golang code. So lets look at the functions we need to get stuff done.
Scroll down if you are only interested in the complete file!
First of all lets create file main.go
which we will later run in a separate pod in our greatest cluster.
Now lets start with components we need to map for our app to work. Let’s start with the struct which will be representing the response from the service providing us with a number of orders.
// OrdersResponse is struct representing JSON response from our managed service
// http://managed-service:8080/orders
type OrdersResponse struct {
Orders int32 "json:\"count,omitempty\"" // Backticks are issue for markdown parser
}
Now lets jump further and map other parts needed. On the side of the Kubernetes we will need the types and interfaces which will represent Kubernetes objects.
// ManagedDeployment struct represents the state used to preform replica set
// scaling.
type ManagedDeployment struct {
// k8s client of the kuberentes deployment
Client appsv1.DeploymentInterface
// System metrics from managed-service from 10 second timespan
ReplicaCount int32
// Fetched whole deployment state
State *v1.Deployment
}
// ManagedDeploymentInterface is the interface exposing methods
type ManagedDeploymentInterface interface {
Get() error
UpdateReplicas() error
}
Small factory for the ManagedDeployment
will be nice:
// NewManagedDeploymentManager is factory used to create ManagedDeployment
func NewManagedDeploymentManager(deploymentsClient appsv1.DeploymentInterface) *ManagedDeployment {
return &ManagedDeployment{Client: deploymentsClient}
}
Now the interesting bit. For this we will use some Golang standard libraries and the Kubernetes client-go and Kubernetes apimachinery.
We will need some imports.
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
v1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
Let’s implement a 2 functions from our ManagedDeploymentInterface:
- Get()
- UpdateReplicas()
First we will create simple pointer helper, as the client library requires pointers to 32 bit integers.
// int32Ptr is used to create pointer neecessary for the replica count
func int32Ptr(i int32) *int32 { return &i }
Get()
– will update the attributes of our ManagedDeployment
with fresh data from Deployment – including what is important for us State
and ReplicaCount
– number of pods.
// Get is used to fetch new state und update the ManagedDeployment.
func (c *ManagedDeployment) Get() error {
result, getErr := c.Client.Get(context.TODO(),
"managed-deployment",
metav1.GetOptions{})
if getErr != nil {
return getErr
}
replicaCount := *result.Spec.Replicas
c.State = result
c.ReplicaCount = replicaCount
return nil
}
UpdateReplicas(count int32)
– will update the ManagedDeployment
with count representing number of pods. The 32-bit integer is used as it is official client requirement.
// UpdateReplicas is used to change the number of deployemnt replicas based on
// provided count number
func (c *ManagedDeployment) UpdateReplicas(count int32) error {
c.State.Spec.Replicas = int32Ptr(count)
_, updateErr := c.Client.Update(context.TODO(),
c.State,
metav1.UpdateOptions{})
if updateErr != nil {
return updateErr
}
c.ReplicaCount = count
return nil
}
Now it is time to wrap it up into a method which will query the managed host and return the simple pod count.
// GetManagedOrdersCount asks managed-service api to get running processes
// count. Serializes response body to json and returns int containging number
func GetManagedOrdersCount() (int32, error) {
url := "http://" + ManagedHost + "/orders"
r := strings.NewReader("")
req, clientErr := http.NewRequest("GET", url, r)
if clientErr != nil {
return int32(0),
fmt.Errorf("Prepare request : %v", clientErr)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, reqErr := client.Do(req)
if reqErr != nil {
return int32(0),
fmt.Errorf("Failed to query the process count : %v", reqErr)
}
defer resp.Body.Close()
// Decode the data
var cResp OrdersResponse
if jsonErr := json.NewDecoder(resp.Body).Decode(&cResp); jsonErr != nil {
return int32(0), fmt.Errorf("Error parsing process count json : %v", jsonErr)
}
return cResp.Orders, nil
}
To connect to the cluster lets setup 2 client helpers, which will aid us if we use running locally (outside docker) or in the cluster.
// clientSetup returns appropriate clientset with config either from ./kube
// or cluster api configuration
func clientSetup() (*kubernetes.Clientset, error) {
if runningInDocker() {
return setupDockerClusterClient()
} else {
return setupLocalClient()
}
}
// setupLocalClient from minikube ./kube/config
func setupLocalClient() (*kubernetes.Clientset, error) {
var ns string
flag.StringVar(&ns, "namespace", "", "namespace")
// Bootstrap k8s configuration from local Kubernetes config file
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
log.Println("Using kubeconfig file: ", kubeconfig)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return &kubernetes.Clientset{}, err
}
// Create an rest client not targeting specific API version
clientset, err := kubernetes.NewForConfig(config)
return clientset, err
}
// setupDockerClusterClient from rest cluster api configuration.
func setupDockerClusterClient() (*kubernetes.Clientset, error) {
log.Println("Running inside container")
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
return &kubernetes.Clientset{}, err
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
return clientset, err
}
Now the logic part, the main
function which will run recursively. The important part is the usage of ticker, which rather than sleep will keep real 10 second intervals.
func main() {
// ManagedHost is URI of the managed-service running on cluster
var ManagedHost string = os.Getenv("MANAGED_SERVICE_HOST")
// to prevent tick shifting rather than using time.Sleep I've decided to go
// with time.Tick which of course this also has some downsides. Channels
// here would add safety. In extreme case there might be race condition.
tick := time.Tick(10 * time.Second)
tickCount := 0
clientset, err := clientSetup()
if err != nil {
log.Fatal(err)
}
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
ManagedDeployment := NewManagedDeploymentManager(deploymentsClient)
// Retrieve the initial version of Deployment
getErr := ManagedDeployment.Get()
if getErr != nil {
log.Fatal(fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr))
}
if ManagedDeployment.ReplicaCount <= 0 {
log.Fatal(fmt.Errorf("Replica count is equal %d, this means the service is still deploying or there is an issue", ManagedDeployment.ReplicaCount))
}
// Get process count from managed-service
oldOrdersCount, err := GetManagedOrdersCount()
if err != nil {
log.Fatal(err)
}
// in 10 second intervals update orders-deployment replicas if needed.
for range tick {
// Update replica count
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
tickCount++ // just for logging puproposes
// Retrieve the latest version of Deployment before attempting to
// run update. RetryOnConflict uses exponential backoff to avoid
// exhausting the apiserver
getErr := ManagedDeployment.Get()
if getErr != nil {
return fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr)
}
// Get current process count from managed-service
currentOrdersCount, err := GetManagedOrdersCount()
if err != nil {
return err
}
// Find how much processes where created in 10 seconds
ordersIn10Seconds := currentOrdersCount - oldOrdersCount
// Count no processes for instance (please take to account int math)
ordersPerInstance := ordersIn10Seconds / ManagedDeployment.ReplicaCount
// Overwrite the old process count for new comparisons in next tick
// it is not needed at this state.
oldOrdersCount = currentOrdersCount
if ordersPerInstance >= 5 && ManagedDeployment.ReplicaCount <= 3 {
// Increment the Replica Count, note: won't use ++ as it can
// impact second conditianal comparsion.
oneMore := ManagedDeployment.ReplicaCount + 1
updateErr := ManagedDeployment.UpdateReplicas(oneMore)
if updateErr == nil {
fmt.Println(" -> Replica count changed to",
oneMore)
}
return updateErr
} else if ordersPerInstance < 5 && ManagedDeployment.ReplicaCount > 3 {
oneLess := ManagedDeployment.ReplicaCount - 1
updateErr := ManagedDeployment.UpdateReplicas(oneLess)
if updateErr == nil {
fmt.Println(" -> Replica count changed to",
oneLess)
}
return updateErr
}
// Overwrite the old process count for new comparisons in next tick
oldOrdersCount = currentOrdersCount
return nil
})
if retryErr != nil {
panic(fmt.Errorf("Update failed: %v", retryErr))
}
}
// end of loop
}
That’s our core logic, responsible for handling deployment scaling back and forth.
Final main.go file
Putting it all together, here is the complete main.go
file.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
v1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)
// OrdersResponse is struct representing JSON response from:
// http://managed-service:8080/orders
type OrdersResponse struct {
Orders int32 "json:\"count,omitempty\"" // Backticks are issue for markdown parser
}
// ManagedDeployment struct represents the state used to preform replica set
// scaling.
type ManagedDeployment struct {
// k8s client of the kuberentes deployment
Client appsv1.DeploymentInterface
// System metrics from managed-service from 10 second timespan
ReplicaCount int32
// Fetched whole deployment state
State *v1.Deployment
}
// ManagedDeploymentInterface is the interface exposing methods
type ManagedDeploymentInterface interface {
Get() error
UpdateReplicas() error
}
// NewManagedDeploymentManager is factory used to create ManagedDeployment
func NewManagedDeploymentManager(deploymentsClient appsv1.DeploymentInterface) *ManagedDeployment {
return &ManagedDeployment{Client: deploymentsClient}
}
func main() {
// ManagedHost is URI of the managed-service running on cluster
var ManagedHost string = os.Getenv("MANAGED_SERVICE_HOST")
// to prevent tick shifting rather than using time.Sleep I've decided to go
// with time.Tick which of course this also has some downsides. Channels
// here would add safety. In extreme case there might be race condition.
tick := time.Tick(10 * time.Second)
tickCount := 0
// Setup client, might be better to use interfaces here.
clientset, err := clientSetup()
if err != nil {
log.Fatal(err)
}
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
ManagedDeployment := NewManagedDeploymentManager(deploymentsClient)
// Retrieve the latest version of Deployment
getErr := ManagedDeployment.Get()
if getErr != nil {
log.Fatal(fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr))
}
if ManagedDeployment.ReplicaCount <= 0 {
log.Fatal(fmt.Errorf("Replica count is equal %d, this means the service is still deploying or there is an issue", ManagedDeployment.ReplicaCount))
}
// Get process count from managed-service
oldOrdersCount, err := GetManagedOrdersCount()
if err != nil {
log.Fatal(err)
}
// in 10 second intervals update orders-deployment replicas if needed.
for range tick {
// Update replica count
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
tickCount++ // just for logging puproposes
// Retrieve the latest version of Deployment before attempting to
// run update. RetryOnConflict uses exponential backoff to avoid
// exhausting the apiserver
getErr := ManagedDeployment.Get()
if getErr != nil {
return fmt.Errorf("Failed to get latest version of \"orders-deployment\" Deployment: %v", getErr)
}
// Get current process count from managed-service
currentOrdersCount, err := GetManagedOrdersCount()
if err != nil {
return err
}
// Find how much processes where created in 10 seconds
ordersIn10Seconds := currentOrdersCount - oldOrdersCount
// Count no processes for instance (please take to account int math)
ordersPerInstance := ordersIn10Seconds / ManagedDeployment.ReplicaCount
// Overwrite the old process count for new comparisons in next tick
// it is not needed at this state.
oldOrdersCount = currentOrdersCount
if ordersPerInstance >= 5 && ManagedDeployment.ReplicaCount <= 3 {
// Increment the Replica Count, note: won't use ++ as it can
// impact second conditianal comparsion.
oneMore := ManagedDeployment.ReplicaCount + 1
updateErr := ManagedDeployment.UpdateReplicas(oneMore)
if updateErr == nil {
fmt.Println(" -> Replica count changed to",
oneMore)
}
return updateErr
} else if ordersPerInstance < 5 && ManagedDeployment.ReplicaCount > 3 {
oneLess := ManagedDeployment.ReplicaCount - 1
updateErr := ManagedDeployment.UpdateReplicas(oneLess)
if updateErr == nil {
fmt.Println(" -> Replica count changed to",
oneLess)
}
return updateErr
}
// Overwrite the old process count for new comparisons in next tick
oldOrdersCount = currentOrdersCount
return nil
})
if retryErr != nil {
panic(fmt.Errorf("Update failed: %v", retryErr))
}
}
// end of loop
}
// Get is used to fetch new state und update the ManagedDeployment.
func (c *ManagedDeployment) Get() error {
result, getErr := c.Client.Get(context.TODO(),
"managed-deployment",
metav1.GetOptions{})
if getErr != nil {
return getErr
}
replicaCount := *result.Spec.Replicas
c.State = result
c.ReplicaCount = replicaCount
return nil
}
// UpdateReplicas is used to change the number of deployemnt replicas based on
// provided count number
func (c *ManagedDeployment) UpdateReplicas(count int32) error {
c.State.Spec.Replicas = int32Ptr(count)
_, updateErr := c.Client.Update(context.TODO(),
c.State,
metav1.UpdateOptions{})
if updateErr != nil {
return updateErr
}
c.ReplicaCount = count
fmt.Println("REPLICA COUNT UPDATE", c.ReplicaCount)
return nil
}
// GetManagedOrdersCount asks managed-service api to get running processes
// count. Serializes response body to json and returns int containging number
func GetManagedOrdersCount() (int32, error) {
url := "http://" + ManagedHost + "/orders"
r := strings.NewReader("")
req, clientErr := http.NewRequest("GET", url, r)
if clientErr != nil {
return int32(0),
fmt.Errorf("Prepare request : %v", clientErr)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, reqErr := client.Do(req)
if reqErr != nil {
return int32(0),
fmt.Errorf("Failed to query the process count : %v", reqErr)
}
defer resp.Body.Close()
// Decode the data
var cResp OrdersResponse
if jsonErr := json.NewDecoder(resp.Body).Decode(&cResp); jsonErr != nil {
return int32(0), fmt.Errorf("Error parsing process count json : %v", jsonErr)
}
return cResp.Orders, nil
}
// int32Ptr is used to create pointer neecessary for the replica count
func int32Ptr(i int32) *int32 { return &i }
// runningInDocker check if there is .dockerenv present, meaning the process is
// running in container. I'm aware that this is not deterministic check for all
// of the setups, but it work fine for this example minikube app.
func runningInDocker() bool {
if _, err := os.Stat("/.dockerenv"); err == nil {
return true
} else if os.IsNotExist(err) {
return false
} else {
// Schrodinger cat, this might be container or not :)
return false
}
}
// clientSetup returns appropriate clientset with config either from ./kube
// or cluster api configuration
func clientSetup() (*kubernetes.Clientset, error) {
if runningInDocker() {
return setupDockerClusterClient()
} else {
return setupLocalClient()
}
}
// setupLocalClient from minikube ./kube/config
func setupLocalClient() (*kubernetes.Clientset, error) {
var ns string
flag.StringVar(&ns, "namespace", "", "namespace")
// Bootstrap k8s configuration from local Kubernetes config file
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
log.Println("Using kubeconfig file: ", kubeconfig)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return &kubernetes.Clientset{}, err
}
// Create an rest client not targeting specific API version
clientset, err := kubernetes.NewForConfig(config)
return clientset, err
}
// setupDockerClusterClient from rest cluster api configuration.
func setupDockerClusterClient() (*kubernetes.Clientset, error) {
log.Println("Running inside container")
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
return &kubernetes.Clientset{}, err
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
return clientset, err
}