Build an operator with kopf

In this guide we will build a controller using kr8s and kopf to read the operating system information from a Pod and add that metadata as labels.

Kopf is an excellent framework for building event driven controllers and it can work hand-in-hand with kr8s when you want to interact with Kubernetes resources directly.

Controller

To create a controller with kopf we are going to create a Python file called controller.py and implement a single event-handler that will be called on new and existing Pods.

Creating an event handler

To build our event-handler we are doing to define a function and use the @kopf.on.resume and @kopf.on.create decorators to set up listeners to trigger our function.

  • @kopf.on.create("pods") will trigger our function when a new Pod is created.

  • @kopf.on.resume("pods") will run on all existing Pods when the controller starts.

# controller.py
import kopf


@kopf.on.resume("pods")
@kopf.on.create("pods")
def add_os_labels(body, logger, **kwargs):
    pass
# controller.py
import kopf


@kopf.on.resume("pods")
@kopf.on.create("pods")
async def add_os_labels(body, logger, **kwargs):
    pass

Get a kr8s object for the resource being handled

Now we can import kr8s and convert the body of the resource that kopf gives us into a Pod object.

# controller.py
import kopf
import kr8s
from kr8s.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
def add_os_labels(body, logger, **kwargs):
    pod = Pod(body)
# controller.py
import kopf
import kr8s
from kr8s.asyncio.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
async def add_os_labels(body, logger, **kwargs):
    pod = await Pod(body)

Error checking

Now we can use kr8s to do some basic error checking:

  • If the Pod does not exist it may have been deleted while our handler was being called, so we can just return.

  • If the os-release/id label already exists for the Pod we can skip over applying it. The controller probably already applied this and then restarted at some point.

  • If the Pod is not ready yet we can tell kopf to try again later by raising a kopf.TemporaryError exception.

# controller.py
import kopf
import kr8s
from kr8s.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
def add_os_labels(body, logger, **kwargs):
    pod = Pod(body)

    # Pod was deleted while trying to add OS labels, give up
    if not pod.exists():
        return

    # Pod already has OS labels, skip
    if "os-release/id" in pod.labels:
        return

    # Pod is not ready yet, retry in a bit
    if not pod.ready():
        raise kopf.TemporaryError(f"Pod {pod.name} is not ready yet", delay=10)

    # TODO Add OS labels
# controller.py
import kopf
import kr8s
from kr8s.asyncio.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
async def add_os_labels(body, logger, **kwargs):
    pod = await Pod(body)

    # Pod was deleted while trying to add OS labels, give up
    if not await pod.exists():
        return

    # Pod already has OS labels, skip
    if "os-release/id" in pod.labels:
        return

    # Pod is not ready yet, retry in a bit
    if not await pod.ready():
        raise kopf.TemporaryError(f"Pod {pod.name} is not ready yet", delay=10)

    # TODO Add OS labels

Get info from /etc/os-release

Now we can use Pod.exec() to get the operating system info from our Pod by calling cat /etc/os-release. If we can’t do this then either cat or /etc/os-release may be missing from the image, in which case we can set the os-release/id to unknown and return.

# controller.py
...

    # Get OS info
    try:
        output = pod.exec(["cat", "/etc/os-release"])
        os_info = output.stdout.decode()
    except kr8s.ExecError:
        logger.error(
            f"Failed to exec in pod {pod.name}, "
            "either cat is not included in the image or /etc/os-release is missing."
        )
        pod.label({"os-release/id": "unknown"})
        return

# controller.py
...

    # Get OS info
    try:
        output = await pod.exec(["cat", "/etc/os-release"])
        os_info = output.stdout.decode()
    except kr8s.ExecError:
        logger.error(
            f"Failed to exec in pod {pod.name}, "
            "either cat is not included in the image or /etc/os-release is missing."
        )
        await pod.label({"os-release/id": "unknown"})
        return

Clean up labels and apply them

Lastly we can convert the contents of /etc/os-release into a dictionary of labels and apply them.

Note

Kubernetes has some constraints around valid keys/values in labels so we will need to clean things up a little with the Regex library re.

# controller.py
import re

...

    # Clean OS labels
    labels = {}
    for label in os_info.splitlines():
        key, value = label.split("=")
        key = f"os-release/{key.lower().replace('_', '-')}"
        # Kubernetes only accepts label values with alphanumeric characters,
        # dots, dashes and underscores and a maximum length of 63 characters.
        value = re.sub("[^0-9a-zA-Z_.-]+", "", value)[:63]
        labels[key] = value

    # Apply OS labels
    pod.label(labels)
# controller.py
import re

...

    # Clean OS labels
    labels = {}
    for label in os_info.splitlines():
        key, value = label.split("=")
        key = f"os-release/{key.lower().replace('_', '-')}"
        # Kubernetes only accepts label values with alphanumeric characters,
        # dots, dashes and underscores and a maximum length of 63 characters.
        value = re.sub("[^0-9a-zA-Z_.-]+", "", value)[:63]
        labels[key] = value

    # Apply OS labels
    await pod.label(labels)

Complete example

Now we can put this all together into our complete controller.py.

# controller.py
import re

import kopf
import kr8s
from kr8s.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
def add_os_labels(body, logger, **kwargs):
    pod = Pod(body)

    # Pod was deleted while trying to add OS labels, give up
    if not pod.exists():
        return

    # Pod already has OS labels, skip
    if "os-release/id" in pod.labels:
        return

    # Pod is not ready yet, retry in a bit
    if not pod.ready():
        raise kopf.TemporaryError(f"Pod {pod.name} is not ready yet", delay=10)

    # Get OS info
    try:
        output = pod.exec(["cat", "/etc/os-release"])
        os_info = output.stdout.decode()
    except kr8s.ExecError:
        logger.error(
            f"Failed to exec in pod {pod.name}, "
            "either cat is not included in the image or /etc/os-release is missing."
        )
        pod.label({"os-release/id": "unknown"})
        return

    # Clean OS labels
    labels = {}
    for label in os_info.splitlines():
        key, value = label.split("=")
        key = f"os-release/{key.lower().replace('_', '-')}"
        # Kubernetes only accepts label values with alphanumeric characters,
        # dots, dashes and underscores and a maximum length of 63 characters.
        value = re.sub("[^0-9a-zA-Z_.-]+", "", value)[:63]
        labels[key] = value

    # Apply OS labels
    pod.label(labels)
# controller.py
import re

import kopf
import kr8s
from kr8s.asyncio.objects import Pod


@kopf.on.resume("pods")
@kopf.on.create("pods")
async def add_os_labels(body, logger, **kwargs):
    pod = await Pod(body)

    # Pod was deleted while trying to add OS labels, give up
    if not await pod.exists():
        return

    # Pod already has OS labels, skip
    if "os-release/id" in pod.labels:
        return

    # Pod is not ready yet, retry in a bit
    if not await pod.ready():
        raise kopf.TemporaryError(f"Pod {pod.name} is not ready yet", delay=10)

    # Get OS info
    try:
        output = await pod.exec(["cat", "/etc/os-release"])
        os_info = output.stdout.decode()
    except kr8s.ExecError:
        logger.error(
            f"Failed to exec in pod {pod.name}, "
            "either cat is not included in the image or /etc/os-release is missing."
        )
        await pod.label({"os-release/id": "unknown"})
        return

    # Clean OS labels
    labels = {}
    for label in os_info.splitlines():
        key, value = label.split("=")
        key = f"os-release/{key.lower().replace('_', '-')}"
        # Kubernetes only accepts label values with alphanumeric characters,
        # dots, dashes and underscores and a maximum length of 63 characters.
        value = re.sub("[^0-9a-zA-Z_.-]+", "", value)[:63]
        labels[key] = value

    # Apply OS labels
    await pod.label(labels)

That’s it! Our simple controller is implemented in less than 50 lines of Python and thanks to kopf and kr8s working hand-in-hand is very readable.

Running

You can run the controller locally for development using the kopf module.

$ python -m kopf run controller.py
kopf._core.engines.a [INFO    ] Initial authentication has been initiated.
kopf._core.engines.a [INFO    ] Initial authentication has finished.
...

Testing

Now let’s open a new Python session and create a Pod with Pod.gen() to test things out.

>>> from kr8s.objects import Pod
>>> pod = Pod.gen(name="test-pod", image="nginx:latest")
>>> pod.create()

Now if we watch the logs of our controller we can see things happening.

$ python -m kopf run controller.py
...
[ERROR   ] [default/test-pod] Handler 'add_os_labels' failed temporarily: Pod test-pod is not ready yet
[ERROR   ] [default/test-pod] Handler 'add_os_labels' failed temporarily: Pod test-pod is not ready yet
[INFO    ] [default/test-pod] Handler 'add_os_labels' succeeded.
[INFO    ] [default/test-pod] Creation is processed: 1 succeeded; 0 failed.

Then in our Python session we can refresh the object and have a look at the labels.

>>> pod.refresh()
>>> print(pod.labels)
{'os-release/bug-report-url': 'httpsbugs.debian.org',
 'os-release/home-url': 'httpswww.debian.org',
 'os-release/id': 'debian',
 'os-release/name': 'DebianGNULinux',
 'os-release/pretty-name': 'DebianGNULinux12bookworm',
 'os-release/support-url': 'httpswww.debian.orgsupport',
 'os-release/version': '12bookworm',
 'os-release/version-codename': 'bookworm',
 'os-release/version-id': '12'}

Deploying

Once you are finished developing and are ready to deploy your controller to your Kubernetes cluster persistently follow the kopf documentation on packaging and deploying your controller. Just be sure to pip install kr8s in the container too!

FROM python:3.11
ADD controller.py /src
RUN pip install kopf kr8s
CMD kopf run /src/controller.py --verbose