Skip to content

Orchestration

At its core, the AirTrafficController (ATC) was designed to enable the deployment of packages as custom resources in your cluster.

Beyond this, it includes a set of primitive features that together enable flexible and robust resource orchestration:

Classic Kubernetes application management tools, such as Helm and Kustomize, have offered little support for orchestration.
Here, we define orchestration as the ability to express relationships between resources in a release or package, specifically in terms of order, coordination, and state.

The reason for this gap is historical: packaging tools like Helm and Kustomize are client-side manifest generators.
They encourage the idea that Kubernetes applications are simply flat lists of YAML manifests.
Even server-side deployment tools like ArgoCD or FluxCD are limited in this regard, as they share the same worldview and ultimately deploy packages through these formats.

Note
Tools like Helm and ArgoCD support limited forms of orchestration, for example, Helm pre/post-install hooks or ArgoCD sync waves.
However, these approaches are shallow: they don’t persist throughout the lifetime of the application.

Traditionally, if you needed intelligent, reactive, and orchestrated application deployment strategies, the solution was to build a custom operator.

While valid, this approach comes with technical challenges, as well as development and maintenance costs that not every team or organization is ready to take on.

Yoke does not suffer from this issue as it operates with a completely different model, one where applications are defined by executable code. It lets you focus on application orchestration by writing a program that:

  1. Reads the input (the desired state of the custom resource).
  2. Reads live state from the cluster.
  3. Updates the resource’s status.
  4. Emits the desired resources to be applied to the cluster.

If the above sounds like the text-book definition of a controller to you, that’s because it is. The ATC is a controller whose job is to reconcile desired package state. That state is defined by executing WASM modules provided by the user. In this way the WASM Module is a proxy for our classic reconciliation loop.

Classically a flight is just a program that reads inputs over stdin and returns kubernetes resources over stdout. This sounds static, but in the case of the ATC since we are running in a control-loop we are able to control our desired state of our package/instance over its lifetime.

Our instance is re-evaluated whenever either of the following conditions are met:

  • a resource managed by our instance is updated or deleted. This includes their status properties.
  • whenever an external resource we lookup is created, updated, or deleted. This includes their status properties.

Finally, we can report the status of our instance by returning our top-level custom resource in our output with the status properties we desire. This is a special use-case, as the top-level instance resource is optional.

Including it or not in the desired state of the package is not required and serves only as an instruction for the ATC to update its status.


The following examples are written in Go to take advantage of the Kubernetes WASI SDK provided by the yoke project to lookup cluster state.

If an SDK does not exist in your preferred WASM compatible language, please open an issue to help track its development.

This example represents a custom resource called Pipeline. Instances of this resource trigger multiple jobs to be done sequentially and the state of the current job is reported back to the pipeline resource.

When working with Airways, the first thing to do is to create our type to represent our custom resource.

type Pipeline struct {
metav1.TypeMeta
metav1.ObjectMeta `json:"metadata"`
Spec struct {
// ... Your fields ...
} `json:"spec,omitzero"`
Status struct {
// Your status fields.
// We will use a simple msg for our example but it can be whatever you wish it to be.
Msg string `json:"msg"`
}
}

We can then use this type in our Flight implementation but also to generate our Airway definition and OpenAPI.

package main
import (
"encoding/json"
"os"
"reflect"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/yokecd/yoke/pkg/apis/airway/v1alpha1"
"github.com/yokecd/yoke/pkg/openapi"
)
func main() {
json.NewEncoder(os.Stdout).Encode(v1alpha1.Airway{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.AirwayGVR().GroupVersion().Identifier(),
Kind: v1alpha1.KindAirway,
},
ObjectMeta: metav1.ObjectMeta{
Name: "pipelines.examples.com",
},
Spec: v1alpha1.AirwaySpec{
WasmURLs: v1alpha1.WasmURLs{
// The URL where your wasm module will be hosted.
Flight: "oci://registry/repo:tag",
},
// In order to be able to fetch state we enable cluster access.
ClusterAccess: true,
// The Airway needs to by dynamic in order to be re-evaluated when sub-resources are updated/created.
Mode: v1alpha1.AirwayModeDynamic,
Template: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "examples.com",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "pipelines",
Singular: "pipeline",
Kind: "Pipeline",
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
// Build the openapi definition from our CustomResource type.
OpenAPIV3Schema: openapi.SchemaFrom(reflect.TypeFor[Pipeline]()),
},
},
},
},
},
})
}

Now all that remains is to write the module that will determine our desired state for our Pipeline resource.

package main
import (
"encoding/json"
"fmt"
"os"
"slices"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"github.com/yokecd/yoke/pkg/flight"
"github.com/yokecd/yoke/pkg/flight/wasi/k8s"
)
func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func run() error {
var pipeline Pipeline
if err := yaml.NewYAMLToJSONDecoder(os.Stdin).Decode(&pipeline); err != nil {
return fmt.Errorf("failed to decode stdin into pipeline: %w", err)
}
resources := flight.Resources{&pipeline}
for _, job := range []*batchv1.Job{
{
TypeMeta: metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-one"},
Spec: batchv1.JobSpec{
// TODO
},
},
{
TypeMeta: metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-two"},
Spec: batchv1.JobSpec{
// TODO
},
},
{
TypeMeta: metav1.TypeMeta{APIVersion: batchv1.SchemeGroupVersion.Identifier(), Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{Name: pipeline.Name + "-three"},
Spec: batchv1.JobSpec{
// TODO
},
},
} {
// add the current job to the desired package state.
resources = append(resources, job)
// Check if the current job has completed.
// If not write the state as is to stdout and wait for the next job update to trigger re-evaluation.
// Remember that any resource that is part of the instance will trigger re-evaluation of the instance on any update/deletion event.
ok, err := hasJobCompleted(&pipeline, job)
if err != nil {
return fmt.Errorf("failed to wait for job alpha to complete: %w", err)
}
if !ok {
return json.NewEncoder(os.Stdout).Encode(resources)
}
}
pipeline.Status.Msg = "all jobs have completed"
return json.NewEncoder(os.Stdout).Encode(resources)
}
func isJobStatus(job *batchv1.Job, typ batchv1.JobConditionType) bool {
return job != nil && slices.ContainsFunc(job.Status.Conditions, func(condition batchv1.JobCondition) bool {
return condition.Type == typ
})
}
func hasJobCompleted(pipeline *Pipeline, job *batchv1.Job) (ok bool, err error) {
live, err := k8s.LookupResource(job)
if err != nil && !k8s.IsErrNotFound(err) {
return false, fmt.Errorf("failed to lookup job %s: %w", job.Name, err)
}
if isJobStatus(live, batchv1.JobFailed) {
pipeline.Status.Msg = fmt.Sprintf("job %s failed", job.Name)
return false, nil
}
if !isJobStatus(live, batchv1.JobComplete) {
pipeline.Status.Msg = fmt.Sprintf("waiting for job %s to complete", job.Name)
return false, nil
}
return true, nil
}

The above program returns an updated state of the desired instance like so:

pipeline-example-flow

Sometimes we may want our application to wait on other resources to be created before moving on to other deployment phases.

Note that today for this scenario we use eventual consistency, allowing certain resources or workloads to be in crashlookBackoff until all resources are met.

The following example creates a SQL Database Instance using crossplane and waits for its connection secret to be created before attempting to create a deployment.

To begin, like before we start with a custom resource type. In this example we will model a generic application.

type App struct {
metav1.TypeMeta
metav1.ObjectMeta `json:"metadata"`
Spec struct {
// Your props
} `json:"spec"`
Status struct {
// For the example we will use a simple message for our App status.
// You can include any data you see fit.
Msg string `json:"msg"`
} `json:"status,omitzero"`
}

And follow it up by defining its corresponding Airway.

package main
import (
"encoding/json"
"os"
"reflect"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/yokecd/yoke/pkg/apis/airway/v1alpha1"
"github.com/yokecd/yoke/pkg/openapi"
)
func main() {
json.NewEncoder(os.Stdout).Encode(v1alpha1.Airway{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.AirwayGVR().GroupVersion().Identifier(),
Kind: v1alpha1.KindAirway,
},
ObjectMeta: metav1.ObjectMeta{
Name: "pipelines.examples.com",
},
Spec: v1alpha1.AirwaySpec{
WasmURLs: v1alpha1.WasmURLs{
// The URL where your wasm module will be hosted.
Flight: "oci://registry/repo:tag",
},
// In order to be able to fetch state we enable cluster access.
ClusterAccess: true,
// We know that our application will depend on external secrets generated by Crossplane.
// By default no external resources can be looked up. To enable this lookup we use resource access matchers.
// for more info on resource matchers: https://yokecd.github.io/docs/concepts/cluster-access/#resource-access-matchers
ResourceAccessMatchers: []string{"Secret"},
// The Airway needs to by dynamic in order to be re-evaluated when sub-resources are updated/created.
Mode: v1alpha1.AirwayModeDynamic,
Template: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "examples.com",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "apps",
Singular: "app",
Kind: "App",
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
// Build the openapi definition from our CustomResource type.
OpenAPIV3Schema: openapi.SchemaFrom(reflect.TypeFor[App]()),
},
},
},
},
},
})
}

Next we build the logic for our module.

package main
import (
"encoding/json"
"fmt"
"os"
"slices"
databasev1beta1 "github.com/crossplane-contrib/provider-gcp/apis/database/v1beta1"
commonv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"github.com/yokecd/yoke/pkg/flight"
"github.com/yokecd/yoke/pkg/flight/wasi/k8s"
)
func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func run() error {
var app App
if err := yaml.NewYAMLToJSONDecoder(os.Stdin).Decode(&app); err != nil {
return fmt.Errorf("failed to decode stdin into App instance: %v", err)
}
// include the app itself in the final json result. This allows us to set its status.
// this resource is special as the ATC knows its the identity of the top-level parent.
// You cannot set status properties for other resources.
resources := flight.Resources{&app}
// Create the database instance using the crossplane project.
// Most fields are omitted for brevity.
database := databasev1beta1.CloudSQLInstance{
TypeMeta: metav1.TypeMeta{
APIVersion: databasev1beta1.SchemeGroupVersion.Identifier(),
Kind: "CloudSQLInstance",
},
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
},
Spec: databasev1beta1.CloudSQLInstanceSpec{
ForProvider: databasev1beta1.CloudSQLInstanceParameters{
// Your cloud instance parameters from your application
},
ResourceSpec: commonv1.ResourceSpec{
WriteConnectionSecretToReference: &commonv1.SecretReference{
Name: app.Name,
Namespace: app.Namespace,
},
},
},
}
resources = append(resources, &database)
secretIdentifier := k8s.ResourceIdentifier{
Name: database.Spec.WriteConnectionSecretToReference.Name,
Namespace: database.Spec.WriteConnectionSecretToReference.Namespace,
ApiVersion: "v1",
Kind: "Secret",
}
// Although we have not added this secret to our package's state, any external resource that we lookup will automatically
// be tracked and our instance will be re-evaluated on any create/update/delete event to this resource.
secret, err := k8s.Lookup[corev1.Secret](secretIdentifier)
if err != nil {
if k8s.IsErrNotFound(err) {
app.Status.Msg = "Waiting for connection secret to be created"
return json.NewEncoder(os.Stdout).Encode(resources)
}
return fmt.Errorf("failed to fetch connection secret: %w", err)
}
// We create the deployment only after the secret has been created.
deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1.SchemeGroupVersion.Identifier(),
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
},
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
// All other fields are omitted for brevity.
// For this example we are just interested in demonstrating that we can wait for the secret to exist before using it.
EnvFrom: []corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: secret.Name},
},
},
},
},
},
},
},
},
Status: appsv1.DeploymentStatus{},
}
resources = append(resources, deployment)
// As a final task, we can report on the status of the deployment.
liveDeployment, err := k8s.LookupResource(deployment)
if err != nil && !k8s.IsErrNotFound(err) {
return fmt.Errorf("failed to lookup deployment: %w", err)
}
if liveDeployment != nil && slices.ContainsFunc(liveDeployment.Status.Conditions, func(cond appsv1.DeploymentCondition) bool {
return cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue
}) {
app.Status.Msg = "application deployed and ready"
} else {
app.Status.Msg = "waiting for application to become ready"
}
return json.NewEncoder(os.Stdout).Encode(resources)
}