From d1faaae1939b5ef551fa4b6a2299e38093b4d210 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Mon, 4 Sep 2023 10:50:37 +0200 Subject: [PATCH 1/8] feat(batch/jobs): API scaffolding Signed-off-by: Mateusz Urbanek --- cmd/k2d.go | 2 + internal/adapter/container_utils.go | 10 ++ internal/adapter/converter/job.go | 12 ++ internal/adapter/job.go | 181 +++++++++++++++++++++++++ internal/adapter/types/labels.go | 3 + internal/api/apis/apigroups_list.go | 9 ++ internal/api/apis/apis.go | 22 +++ internal/api/apis/batch/batch.go | 56 ++++++++ internal/api/apis/batch/jobs/create.go | 37 +++++ internal/api/apis/batch/jobs/delete.go | 24 ++++ internal/api/apis/batch/jobs/get.go | 29 ++++ internal/api/apis/batch/jobs/jobs.go | 74 ++++++++++ internal/api/apis/batch/jobs/list.go | 24 ++++ internal/api/apis/batch/jobs/patch.go | 67 +++++++++ 14 files changed, 550 insertions(+) create mode 100644 internal/adapter/converter/job.go create mode 100644 internal/adapter/job.go create mode 100644 internal/api/apis/batch/batch.go create mode 100644 internal/api/apis/batch/jobs/create.go create mode 100644 internal/api/apis/batch/jobs/delete.go create mode 100644 internal/api/apis/batch/jobs/get.go create mode 100644 internal/api/apis/batch/jobs/jobs.go create mode 100644 internal/api/apis/batch/jobs/list.go create mode 100644 internal/api/apis/batch/jobs/patch.go diff --git a/cmd/k2d.go b/cmd/k2d.go index 7718f57..2d55f43 100644 --- a/cmd/k2d.go +++ b/cmd/k2d.go @@ -176,6 +176,8 @@ func main() { container.Add(apis.APIs()) // /apis/apps container.Add(apis.Apps()) + // /apis/batch + container.Add(apis.Batch()) // /apis/events.k8s.io container.Add(apis.Events()) // /apis/authorization.k8s.io diff --git a/internal/adapter/container_utils.go b/internal/adapter/container_utils.go index 02bbba6..92c98a7 100644 --- a/internal/adapter/container_utils.go +++ b/internal/adapter/container_utils.go @@ -21,6 +21,7 @@ import ( k2dtypes "github.com/portainer/k2d/internal/adapter/types" "github.com/portainer/k2d/internal/k8s" "github.com/portainer/k2d/pkg/maputils" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/apis/core" @@ -165,12 +166,15 @@ func (adapter *KubeDockerAdapter) buildContainerConfigurationFromExistingContain // This is used to ensure that the container is created in the correct network. // - podSpec: Holds the corev1.PodSpec object representing the desired state of the associated Pod. // This includes configurations like the container image, environment variables, and volume mounts. +// - jobSpec: Holds the batchv1.PodSpec object representing the desired state of the associated Job. +// This includes configurations like the container image, environment variables, and volume mounts. type ContainerCreationOptions struct { containerName string labels map[string]string lastAppliedConfiguration string namespace string podSpec corev1.PodSpec + jobSpec batchv1.JobSpec } // getContainer inspects the specified container and returns its details in the form of a pointer to a types.ContainerJSON object. @@ -303,6 +307,12 @@ func (adapter *KubeDockerAdapter) createContainerFromPodSpec(ctx context.Context return adapter.cli.ContainerStart(ctx, containerCreateResponse.ID, types.ContainerStartOptions{}) } +// createContainerFromJobSpec orchestrates the creation of a Docker container based on a given Kubernetes PodSpec. +// TODO: needs full description +func (adapter *KubeDockerAdapter) createContainerFromJobSpec(ctx context.Context, options ContainerCreationOptions) error { + panic("unimplemented") +} + // DeleteContainer attempts to remove a Docker container based on its name and associated namespace. // The container name is fully qualified by appending the namespace to it using the buildContainerName function. // This function forcefully removes the container, regardless of whether it is running or not. diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go new file mode 100644 index 0000000..03b8694 --- /dev/null +++ b/internal/adapter/converter/job.go @@ -0,0 +1,12 @@ +package converter + +import ( + "github.com/docker/docker/api/types" + batchv1 "k8s.io/api/batch/v1" +) + +// ConvertContainerToPod converts a given Docker container into a Kubernetes Pod object. +// TODO: needs full description +func (converter *DockerAPIConverter) ConvertContainerToJob(container types.Container) batchv1.Job { + panic("unimplemented") +} diff --git a/internal/adapter/job.go b/internal/adapter/job.go new file mode 100644 index 0000000..7307df9 --- /dev/null +++ b/internal/adapter/job.go @@ -0,0 +1,181 @@ +package adapter + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "github.com/docker/docker/api/types" + "github.com/portainer/k2d/internal/adapter/errors" + "github.com/portainer/k2d/internal/adapter/filters" + k2dtypes "github.com/portainer/k2d/internal/adapter/types" + "github.com/portainer/k2d/internal/k8s" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type JobLogOptions struct { + Timestamps bool + Follow bool + Tail string +} + +func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, job *batchv1.Job) error { + opts := ContainerCreationOptions{ + containerName: job.Name, + namespace: job.Namespace, + jobSpec: job.Spec, + labels: job.Labels, + } + + if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("unable to marshal job: %w", err) + } + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = string(jobData) + } + + opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] + + return adapter.createContainerFromJobSpec(ctx, opts) +} + +// The GetJob implementation is using a filtered list approach as the Docker API provide different response types +// when inspecting a container and listing containers. +// The logic used to build a job from a container is based on the type returned by the list operation (types.Container) +// and not the inspect operation (types.ContainerJSON). +// This is because using the inspect operation everywhere would be more expensive overall. +func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, namespace string) (*batchv1.Job, error) { + filter := filters.ByPod(namespace, jobName) // NOTE: I am not sure about this, should work? + containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) + if err != nil { + return nil, fmt.Errorf("unable to list containers: %w", err) + } + + var container *types.Container + + containerName := buildContainerName(jobName, namespace) + for _, cntr := range containers { + if cntr.Names[0] == "/"+containerName { + container = &cntr + break + } + } + + if container == nil { + adapter.logger.Errorf("unable to find container for job %s in namespace %s", jobName, namespace) + return nil, errors.ErrResourceNotFound + } + + job, err := adapter.buildJobFromContainer(*container) + if err != nil { + return nil, fmt.Errorf("unable to get job: %w", err) + } + + versionedJob := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "v1", + }, + } + + err = adapter.ConvertK8SResource(job, &versionedJob) + if err != nil { + return nil, fmt.Errorf("unable to convert internal object to versioned object: %w", err) + } + + return &versionedJob, nil +} + +func (adapter *KubeDockerAdapter) GetJobLogs(ctx context.Context, namespace string, jobName string, opts JobLogOptions) (io.ReadCloser, error) { + containerName := buildContainerName(jobName, namespace) + container, err := adapter.cli.ContainerInspect(ctx, containerName) + if err != nil { + return nil, fmt.Errorf("unable to inspect container: %w", err) + } + + return adapter.cli.ContainerLogs(ctx, container.ID, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Timestamps: opts.Timestamps, + Follow: opts.Follow, + Tail: opts.Tail, + }) +} + +func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace string) (*metav1.Table, error) { + jobList, err := adapter.listJobs(ctx, namespace) + if err != nil { + return &metav1.Table{}, fmt.Errorf("unable to list jobs: %w", err) + } + + return k8s.GenerateTable(&jobList) +} + +func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { + jobList, err := adapter.listJobs(ctx, namespace) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to list jobs: %w", err) + } + + versionedJobList := batchv1.JobList{ + TypeMeta: metav1.TypeMeta{ + Kind: "JobList", + APIVersion: "v1", + }, + } + + err = adapter.ConvertK8SResource(&jobList, &versionedJobList) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) + } + + return versionedJobList, nil +} + +func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batchv1.Job, error) { + job := adapter.converter.ConvertContainerToJob(container) + + if container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] != "" { + internalJobSpecData := container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] + jobSpec := batchv1.JobSpec{} + + err := json.Unmarshal([]byte(internalJobSpecData), &jobSpec) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal job spec: %w", err) + } + + job.Spec = jobSpec + } + + return &job, nil +} + +func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { + filter := filters.ByNamespace(namespace) + containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to list containers: %w", err) + } + + jobs := []batchv1.Job{} + + for _, container := range containers { + job, err := adapter.buildJobFromContainer(container) + if err != nil { + return batchv1.JobList{}, fmt.Errorf("unable to get jobs: %w", err) + } + + jobs = append(jobs, *job) + } + + return batchv1.JobList{ + TypeMeta: metav1.TypeMeta{ + Kind: "JobList", + APIVersion: "v1", + }, + Items: jobs, + }, nil +} diff --git a/internal/adapter/types/labels.go b/internal/adapter/types/labels.go index 34932b4..64b619b 100644 --- a/internal/adapter/types/labels.go +++ b/internal/adapter/types/labels.go @@ -35,6 +35,9 @@ const ( // StorageTypeLabelKey is the key used to store the storage type in the labels of a system configmap or a Docker volume // It is used to differentiate between persistent volumes and config maps when listing volumes StorageTypeLabelKey = "storage.k2d.io/type" + + // JobLastAppliedConfigLabelKey is the key used to store the job specific last applied configuration in the container labels + JobLastAppliedConfigLabelKey = "job.k2d.io/last-applied-configuration" ) const ( diff --git a/internal/api/apis/apigroups_list.go b/internal/api/apis/apigroups_list.go index 13b7e4d..d196feb 100644 --- a/internal/api/apis/apigroups_list.go +++ b/internal/api/apis/apigroups_list.go @@ -21,6 +21,15 @@ func ListAPIGroups(r *restful.Request, w *restful.Response) { }, }, }, + { + Name: "batch", + Versions: []metav1.GroupVersionForDiscovery{ + { + GroupVersion: "batch/v1", + Version: "v1", + }, + }, + }, { Name: "events.k8s.io", Versions: []metav1.GroupVersionForDiscovery{ diff --git a/internal/api/apis/apis.go b/internal/api/apis/apis.go index 15a8ed4..51416af 100644 --- a/internal/api/apis/apis.go +++ b/internal/api/apis/apis.go @@ -5,6 +5,7 @@ import ( "github.com/portainer/k2d/internal/adapter" "github.com/portainer/k2d/internal/api/apis/apps" "github.com/portainer/k2d/internal/api/apis/authorization.k8s.io" + "github.com/portainer/k2d/internal/api/apis/batch" "github.com/portainer/k2d/internal/api/apis/events.k8s.io" "github.com/portainer/k2d/internal/api/apis/storage.k8s.io" "github.com/portainer/k2d/internal/controller" @@ -13,6 +14,7 @@ import ( type ( ApisAPI struct { apps apps.AppsService + batch batch.BatchService events events.EventsService authorization authorization.AuthorizationService storage storage.StorageService @@ -22,6 +24,7 @@ type ( func NewApisAPI(adapter *adapter.KubeDockerAdapter, operations chan controller.Operation) *ApisAPI { return &ApisAPI{ apps: apps.NewAppsService(operations, adapter), + batch: batch.NewBatchService(operations, adapter), events: events.NewEventsService(adapter), authorization: authorization.NewAuthorizationService(), storage: storage.NewStorageService(adapter), @@ -116,3 +119,22 @@ func (api ApisAPI) Apps() *restful.WebService { api.apps.RegisterAppsAPI(routes) return routes } + +// /apis/batch +func (api ApisAPI) Batch() *restful.WebService { + routes := new(restful.WebService). + Path("/apis/batch"). + Consumes(restful.MIME_JSON, "application/yml", "application/json-patch+json", "application/merge-patch+json", "application/strategic-merge-patch+json"). + Produces(restful.MIME_JSON) + + // which versions are served by this api + routes.Route(routes.GET(""). + To(api.batch.GetAPIVersions)) + + // which resources are available under /apis/batch/v1 + routes.Route(routes.GET("/v1"). + To(api.batch.ListAPIResources)) + + api.batch.RegisterBatchAPI(routes) + return routes +} diff --git a/internal/api/apis/batch/batch.go b/internal/api/apis/batch/batch.go new file mode 100644 index 0000000..df77131 --- /dev/null +++ b/internal/api/apis/batch/batch.go @@ -0,0 +1,56 @@ +package batch + +import ( + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/adapter" + "github.com/portainer/k2d/internal/api/apis/batch/jobs" + "github.com/portainer/k2d/internal/controller" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type BatchService struct { + jobs jobs.JobService +} + +func NewBatchService(operations chan controller.Operation, adapter *adapter.KubeDockerAdapter) BatchService { + return BatchService{ + jobs: jobs.NewJobService(adapter, operations), + } +} + +func (svc BatchService) GetAPIVersions(r *restful.Request, w *restful.Response) { + apiVersion := metav1.APIVersions{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIVersions", + }, + Versions: []string{"batch/v1"}, + } + + w.WriteAsJson(apiVersion) +} + +func (svc BatchService) ListAPIResources(r *restful.Request, w *restful.Response) { + resourceList := metav1.APIResourceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "APIResourceList", + APIVersion: "v1", + }, + GroupVersion: "batch/v1", + APIResources: []metav1.APIResource{ + { + Kind: "Job", + SingularName: "", + Name: "jobs", + Verbs: []string{"create", "list", "delete", "get", "patch"}, + Namespaced: true, + }, + }, + } + + w.WriteAsJson(resourceList) +} + +func (svc BatchService) RegisterBatchAPI(routes *restful.WebService) { + // jobs + svc.jobs.RegisterJobAPI(routes) +} diff --git a/internal/api/apis/batch/jobs/create.go b/internal/api/apis/batch/jobs/create.go new file mode 100644 index 0000000..9180944 --- /dev/null +++ b/internal/api/apis/batch/jobs/create.go @@ -0,0 +1,37 @@ +package jobs + +import ( + "fmt" + "net/http" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + "github.com/portainer/k2d/internal/controller" + "github.com/portainer/k2d/internal/types" + httputils "github.com/portainer/k2d/pkg/http" + batchv1 "k8s.io/api/batch/v1" +) + +func (svc JobService) CreateJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + + job := &batchv1.Job{} + + err := httputils.ParseJSONBody(r.Request, &job) + if err != nil { + utils.HttpError(r, w, http.StatusBadRequest, fmt.Errorf("unable to parse request body: %w", err)) + return + } + + job.Namespace = namespace + + dryRun := r.QueryParameter("dryRun") != "" + if dryRun { + w.WriteAsJson(job) + return + } + + svc.operations <- controller.NewOperation(job, controller.MediumPriorityOperation, r.HeaderParameter(types.RequestIDHeader)) + + w.WriteAsJson(job) +} diff --git a/internal/api/apis/batch/jobs/delete.go b/internal/api/apis/batch/jobs/delete.go new file mode 100644 index 0000000..5c87e12 --- /dev/null +++ b/internal/api/apis/batch/jobs/delete.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "net/http" + + "github.com/emicklei/go-restful/v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (svc JobService) DeleteJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + svc.adapter.DeleteContainer(r.Request.Context(), jobName, namespace) + + w.WriteAsJson(metav1.Status{ + TypeMeta: metav1.TypeMeta{ + Kind: "Status", + APIVersion: "v1", + }, + Status: "Success", + Code: http.StatusOK, + }) +} diff --git a/internal/api/apis/batch/jobs/get.go b/internal/api/apis/batch/jobs/get.go new file mode 100644 index 0000000..f62bb55 --- /dev/null +++ b/internal/api/apis/batch/jobs/get.go @@ -0,0 +1,29 @@ +package jobs + +import ( + "errors" + "fmt" + "net/http" + + "github.com/emicklei/go-restful/v3" + adaptererr "github.com/portainer/k2d/internal/adapter/errors" + "github.com/portainer/k2d/internal/api/utils" +) + +func (svc JobService) GetJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + job, err := svc.adapter.GetJob(r.Request.Context(), jobName, namespace) + if err != nil { + if errors.Is(err, adaptererr.ErrResourceNotFound) { + w.WriteHeader(http.StatusNotFound) + return + } + + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to get job: %w", err)) + return + } + + w.WriteAsJson(job) +} diff --git a/internal/api/apis/batch/jobs/jobs.go b/internal/api/apis/batch/jobs/jobs.go new file mode 100644 index 0000000..e2103f5 --- /dev/null +++ b/internal/api/apis/batch/jobs/jobs.go @@ -0,0 +1,74 @@ +package jobs + +import ( + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/adapter" + "github.com/portainer/k2d/internal/controller" +) + +type JobService struct { + adapter *adapter.KubeDockerAdapter + operations chan controller.Operation +} + +func NewJobService(adapter *adapter.KubeDockerAdapter, operations chan controller.Operation) JobService { + return JobService{ + adapter: adapter, + operations: operations, + } +} + +func (svc JobService) RegisterJobAPI(ws *restful.WebService) { + jobGVKExtension := map[string]string{ + "group": "batch", + "kind": "Job", + "version": "v1", + } + + ws.Route(ws.POST("/v1/jobs"). + To(svc.CreateJob). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string"))) + + ws.Route(ws.POST("/v1/namespaces/{namespace}/jobs"). + To(svc.CreateJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string"))) + + ws.Route(ws.GET("/v1/jobs"). + To(svc.ListJobs)) + + ws.Route(ws.GET("/v1/namespaces/{namespace}/jobs"). + To(svc.ListJobs). + Param(ws.PathParameter("namespace", "namespace name").DataType("string"))) + + ws.Route(ws.DELETE("/v1/jobs/{name}"). + To(svc.DeleteJob). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.DELETE("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.DeleteJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.GET("/v1/jobs/{name}"). + To(svc.GetJob). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.GET("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.GetJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string"))) + + ws.Route(ws.PATCH("/v1/jobs/{name}"). + To(svc.PatchJob). + Param(ws.PathParameter("name", "name of the job").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string")). + AddExtension("x-kubernetes-group-version-kind", jobGVKExtension)) + + ws.Route(ws.PATCH("/v1/namespaces/{namespace}/jobs/{name}"). + To(svc.PatchJob). + Param(ws.PathParameter("namespace", "namespace name").DataType("string")). + Param(ws.PathParameter("name", "name of the job").DataType("string")). + Param(ws.QueryParameter("dryRun", "when present, indicates that modifications should not be persisted").DataType("string")). + AddExtension("x-kubernetes-group-version-kind", jobGVKExtension)) +} diff --git a/internal/api/apis/batch/jobs/list.go b/internal/api/apis/batch/jobs/list.go new file mode 100644 index 0000000..178d777 --- /dev/null +++ b/internal/api/apis/batch/jobs/list.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "context" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (svc JobService) ListJobs(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + + utils.ListResources( + r, + w, + func(ctx context.Context) (interface{}, error) { + return svc.adapter.ListJobs(ctx, namespace) + }, + func(ctx context.Context) (*metav1.Table, error) { + return svc.adapter.GetJobTable(ctx, namespace) + }, + ) +} diff --git a/internal/api/apis/batch/jobs/patch.go b/internal/api/apis/batch/jobs/patch.go new file mode 100644 index 0000000..2f5e5ae --- /dev/null +++ b/internal/api/apis/batch/jobs/patch.go @@ -0,0 +1,67 @@ +package jobs + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/emicklei/go-restful/v3" + "github.com/portainer/k2d/internal/api/utils" + "github.com/portainer/k2d/internal/controller" + "github.com/portainer/k2d/internal/types" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" +) + +func (svc JobService) PatchJob(r *restful.Request, w *restful.Response) { + namespace := r.PathParameter("namespace") + jobName := r.PathParameter("name") + + patch, err := io.ReadAll(r.Request.Body) + if err != nil { + utils.HttpError(r, w, http.StatusBadRequest, fmt.Errorf("unable to parse request body: %w", err)) + return + } + + job, err := svc.adapter.GetJob(r.Request.Context(), jobName, namespace) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to get job: %w", err)) + return + } + + if job == nil { + w.WriteHeader(http.StatusNotFound) + return + } + + data, err := json.Marshal(job) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to marshal job: %w", err)) + return + } + + mergedData, err := strategicpatch.StrategicMergePatch(data, patch, batchv1.Job{}) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to apply patch: %w", err)) + return + } + + updatedJob := &batchv1.Job{} + + err = json.Unmarshal(mergedData, updatedJob) + if err != nil { + utils.HttpError(r, w, http.StatusInternalServerError, fmt.Errorf("unable to unmarshal job: %w", err)) + return + } + + dryRun := r.QueryParameter("dryRun") != "" + if dryRun { + w.WriteAsJson(updatedJob) + return + } + + svc.operations <- controller.NewOperation(updatedJob, controller.MediumPriorityOperation, r.HeaderParameter(types.RequestIDHeader)) + + w.WriteAsJson(updatedJob) +} From ab304c85e077d228b2bbf5119a74c6f408f1257b Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Mon, 4 Sep 2023 16:09:12 +0200 Subject: [PATCH 2/8] fix(batch/job): fixed issue with invalid version Signed-off-by: Mateusz Urbanek --- Makefile | 2 +- internal/adapter/container_utils.go | 77 ++++++++++++++++++++++++++++- internal/adapter/converter/job.go | 4 +- internal/adapter/converter/pod.go | 54 ++++++++++++++++++++ internal/adapter/job.go | 23 ++++----- 5 files changed, 145 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index fe4dbc8..dcbb82f 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # go-workbench Makefile template v1.1.0 # For a list of valid GOOS and GOARCH values, see: https://gist.github.com/asukakenji/f15ba7e588ac42795f421b48b8aede63 # Note: these can be overriden on the command line e.g. `make PLATFORM= ARCH=` -PLATFORM="$(shell go env GOOS)" +PLATFORM="linux" ARCH="$(shell go env GOARCH)" ARM="" VERSION="latest" diff --git a/internal/adapter/container_utils.go b/internal/adapter/container_utils.go index 92c98a7..47ab5b4 100644 --- a/internal/adapter/container_utils.go +++ b/internal/adapter/container_utils.go @@ -24,6 +24,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/core" ) @@ -310,7 +311,81 @@ func (adapter *KubeDockerAdapter) createContainerFromPodSpec(ctx context.Context // createContainerFromJobSpec orchestrates the creation of a Docker container based on a given Kubernetes PodSpec. // TODO: needs full description func (adapter *KubeDockerAdapter) createContainerFromJobSpec(ctx context.Context, options ContainerCreationOptions) error { - panic("unimplemented") + if options.lastAppliedConfiguration != "" { + options.labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] = options.lastAppliedConfiguration + } + + internalJobSpec := batch.JobSpec{} + err := adapter.ConvertK8SResource(&options.jobSpec, &internalJobSpec) + if err != nil { + return fmt.Errorf("unable to convert versioned pod spec to internal pod spec: %w", err) + } + + internalJobSpecData, err := json.Marshal(internalJobSpec) + if err != nil { + return fmt.Errorf("unable to marshal internal pod spec: %w", err) + } + options.labels[k2dtypes.PodLastAppliedConfigLabelKey] = string(internalJobSpecData) + options.labels[k2dtypes.NamespaceLabelKey] = options.namespace + options.labels[k2dtypes.WorkloadNameLabelKey] = options.containerName + options.labels[k2dtypes.NetworkNameLabelKey] = buildNetworkName(options.namespace) + + containerCfg, err := adapter.converter.ConvertJobSpecToContainerConfiguration(internalJobSpec, options.namespace, options.labels) + if err != nil { + return fmt.Errorf("unable to build container configuration from pod spec: %w", err) + } + containerCfg.ContainerName = buildContainerName(options.containerName, options.namespace) + + existingContainer, err := adapter.getContainer(ctx, containerCfg.ContainerName) + if err != nil { + return fmt.Errorf("unable to inspect container: %w", err) + } + + if existingContainer != nil { + if options.lastAppliedConfiguration == existingContainer.Config.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] { + adapter.logger.Infof("container with the name %s already exists with the same configuration. The update will be skipped", containerCfg.ContainerName) + return nil + } + + adapter.logger.Infof("container with the name %s already exists with a different configuration. The container will be recreated", containerCfg.ContainerName) + + if existingContainer.Config.Labels[k2dtypes.ServiceLastAppliedConfigLabelKey] != "" { + options.labels[k2dtypes.ServiceLastAppliedConfigLabelKey] = existingContainer.Config.Labels[k2dtypes.ServiceLastAppliedConfigLabelKey] + } + + err := adapter.cli.ContainerRemove(ctx, existingContainer.ID, types.ContainerRemoveOptions{Force: true}) + if err != nil { + return fmt.Errorf("unable to remove container: %w", err) + } + } + + registryAuth, err := adapter.getRegistryCredentials(options.podSpec, options.namespace, containerCfg.ContainerConfig.Image) + if err != nil { + return fmt.Errorf("unable to get registry credentials: %w", err) + } + + out, err := adapter.cli.ImagePull(ctx, containerCfg.ContainerConfig.Image, types.ImagePullOptions{ + RegistryAuth: registryAuth, + }) + if err != nil { + return fmt.Errorf("unable to pull %s image: %w", containerCfg.ContainerConfig.Image, err) + } + defer out.Close() + + io.Copy(os.Stdout, out) + + containerCreateResponse, err := adapter.cli.ContainerCreate(ctx, + containerCfg.ContainerConfig, + containerCfg.HostConfig, + containerCfg.NetworkConfig, + nil, + containerCfg.ContainerName, + ) + if err != nil { + return fmt.Errorf("unable to create container: %w", err) + } + + return adapter.cli.ContainerStart(ctx, containerCreateResponse.ID, types.ContainerStartOptions{}) } // DeleteContainer attempts to remove a Docker container based on its name and associated namespace. diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index 03b8694..6280d55 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -2,11 +2,11 @@ package converter import ( "github.com/docker/docker/api/types" - batchv1 "k8s.io/api/batch/v1" + "k8s.io/kubernetes/pkg/apis/batch" ) // ConvertContainerToPod converts a given Docker container into a Kubernetes Pod object. // TODO: needs full description -func (converter *DockerAPIConverter) ConvertContainerToJob(container types.Container) batchv1.Job { +func (converter *DockerAPIConverter) ConvertContainerToJob(container types.Container) batch.Job { panic("unimplemented") } diff --git a/internal/adapter/converter/pod.go b/internal/adapter/converter/pod.go index af6b2e1..78648cd 100644 --- a/internal/adapter/converter/pod.go +++ b/internal/adapter/converter/pod.go @@ -14,6 +14,7 @@ import ( "github.com/portainer/k2d/internal/adapter/naming" k2dtypes "github.com/portainer/k2d/internal/adapter/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/core" ) @@ -199,6 +200,59 @@ func (converter *DockerAPIConverter) ConvertPodSpecToContainerConfiguration(spec }, nil } +// ConvertJobSpecToContainerConfiguration converts a Kubernetes JobSpec into a Docker ContainerConfiguration. +func (converter *DockerAPIConverter) ConvertJobSpecToContainerConfiguration(spec batch.JobSpec, namespace string, labels map[string]string) (ContainerConfiguration, error) { + podSpec := spec.Template.Spec + containerSpec := podSpec.Containers[0] + + containerConfig := &container.Config{ + Image: containerSpec.Image, + Labels: labels, + Env: []string{ + fmt.Sprintf("KUBERNETES_SERVICE_HOST=%s", converter.k2dServerConfiguration.ServerIpAddr), + fmt.Sprintf("KUBERNETES_SERVICE_PORT=%d", converter.k2dServerConfiguration.ServerPort), + }, + } + + hostConfig := &container.HostConfig{ + ExtraHosts: []string{ + fmt.Sprintf("kubernetes.default.svc:%s", converter.k2dServerConfiguration.ServerIpAddr), + }, + } + + if err := converter.setServiceAccountTokenAndCACert(hostConfig); err != nil { + return ContainerConfiguration{}, err + } + + if err := converter.setHostPorts(containerConfig, hostConfig, containerSpec.Ports); err != nil { + return ContainerConfiguration{}, err + } + + if err := converter.setEnvVars(namespace, containerConfig, containerSpec.Env, containerSpec.EnvFrom); err != nil { + return ContainerConfiguration{}, err + } + + setCommandAndArgs(containerConfig, containerSpec.Command, containerSpec.Args) + setRestartPolicy(hostConfig, podSpec.RestartPolicy) + setSecurityContext(containerConfig, hostConfig, podSpec.SecurityContext, containerSpec.SecurityContext) + converter.setResourceRequirements(hostConfig, containerSpec.Resources) + + if err := converter.setVolumeMounts(namespace, hostConfig, podSpec.Volumes, containerSpec.VolumeMounts); err != nil { + return ContainerConfiguration{}, err + } + + networkName := labels[k2dtypes.NetworkNameLabelKey] + return ContainerConfiguration{ + ContainerConfig: containerConfig, + HostConfig: hostConfig, + NetworkConfig: &network.NetworkingConfig{ + EndpointsConfig: map[string]*network.EndpointSettings{ + networkName: {}, + }, + }, + }, nil +} + // setResourceRequirements configures the Docker container's resource constraints based on the provided core.ResourceRequirements. // It receives a Docker HostConfig and a Kubernetes ResourceRequirements. func (converter *DockerAPIConverter) setResourceRequirements(hostConfig *container.HostConfig, resources core.ResourceRequirements) { diff --git a/internal/adapter/job.go b/internal/adapter/job.go index 7307df9..8688a66 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -13,6 +13,7 @@ import ( "github.com/portainer/k2d/internal/k8s" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" ) type JobLogOptions struct { @@ -114,13 +115,13 @@ func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace str return k8s.GenerateTable(&jobList) } -func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { +func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batch.JobList, error) { jobList, err := adapter.listJobs(ctx, namespace) if err != nil { - return batchv1.JobList{}, fmt.Errorf("unable to list jobs: %w", err) + return batch.JobList{}, fmt.Errorf("unable to list jobs: %w", err) } - versionedJobList := batchv1.JobList{ + versionedJobList := batch.JobList{ TypeMeta: metav1.TypeMeta{ Kind: "JobList", APIVersion: "v1", @@ -129,18 +130,18 @@ func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string err = adapter.ConvertK8SResource(&jobList, &versionedJobList) if err != nil { - return batchv1.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) + return batch.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) } return versionedJobList, nil } -func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batchv1.Job, error) { +func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batch.Job, error) { job := adapter.converter.ConvertContainerToJob(container) if container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] != "" { internalJobSpecData := container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] - jobSpec := batchv1.JobSpec{} + jobSpec := batch.JobSpec{} err := json.Unmarshal([]byte(internalJobSpecData), &jobSpec) if err != nil { @@ -153,25 +154,25 @@ func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Containe return &job, nil } -func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { +func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batch.JobList, error) { filter := filters.ByNamespace(namespace) containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) if err != nil { - return batchv1.JobList{}, fmt.Errorf("unable to list containers: %w", err) + return batch.JobList{}, fmt.Errorf("unable to list containers: %w", err) } - jobs := []batchv1.Job{} + jobs := []batch.Job{} for _, container := range containers { job, err := adapter.buildJobFromContainer(container) if err != nil { - return batchv1.JobList{}, fmt.Errorf("unable to get jobs: %w", err) + return batch.JobList{}, fmt.Errorf("unable to get jobs: %w", err) } jobs = append(jobs, *job) } - return batchv1.JobList{ + return batch.JobList{ TypeMeta: metav1.TypeMeta{ Kind: "JobList", APIVersion: "v1", From bb63c4dffac0d2f70a88631df01b05f16a9fa023 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Tue, 5 Sep 2023 20:53:58 +0200 Subject: [PATCH 3/8] refactor: remove invalid methods Treat this as a checkpoint, I need to start over. The valid implementation is to write the job similarly to the deployment, instead of pod. Signed-off-by: Mateusz Urbanek --- internal/adapter/container_utils.go | 83 ----------------------------- internal/adapter/converter/job.go | 11 ---- internal/adapter/converter/pod.go | 54 ------------------- internal/adapter/job.go | 41 -------------- 4 files changed, 189 deletions(-) diff --git a/internal/adapter/container_utils.go b/internal/adapter/container_utils.go index 47ab5b4..a63eaeb 100644 --- a/internal/adapter/container_utils.go +++ b/internal/adapter/container_utils.go @@ -21,10 +21,8 @@ import ( k2dtypes "github.com/portainer/k2d/internal/adapter/types" "github.com/portainer/k2d/internal/k8s" "github.com/portainer/k2d/pkg/maputils" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/core" ) @@ -175,7 +173,6 @@ type ContainerCreationOptions struct { lastAppliedConfiguration string namespace string podSpec corev1.PodSpec - jobSpec batchv1.JobSpec } // getContainer inspects the specified container and returns its details in the form of a pointer to a types.ContainerJSON object. @@ -308,86 +305,6 @@ func (adapter *KubeDockerAdapter) createContainerFromPodSpec(ctx context.Context return adapter.cli.ContainerStart(ctx, containerCreateResponse.ID, types.ContainerStartOptions{}) } -// createContainerFromJobSpec orchestrates the creation of a Docker container based on a given Kubernetes PodSpec. -// TODO: needs full description -func (adapter *KubeDockerAdapter) createContainerFromJobSpec(ctx context.Context, options ContainerCreationOptions) error { - if options.lastAppliedConfiguration != "" { - options.labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] = options.lastAppliedConfiguration - } - - internalJobSpec := batch.JobSpec{} - err := adapter.ConvertK8SResource(&options.jobSpec, &internalJobSpec) - if err != nil { - return fmt.Errorf("unable to convert versioned pod spec to internal pod spec: %w", err) - } - - internalJobSpecData, err := json.Marshal(internalJobSpec) - if err != nil { - return fmt.Errorf("unable to marshal internal pod spec: %w", err) - } - options.labels[k2dtypes.PodLastAppliedConfigLabelKey] = string(internalJobSpecData) - options.labels[k2dtypes.NamespaceLabelKey] = options.namespace - options.labels[k2dtypes.WorkloadNameLabelKey] = options.containerName - options.labels[k2dtypes.NetworkNameLabelKey] = buildNetworkName(options.namespace) - - containerCfg, err := adapter.converter.ConvertJobSpecToContainerConfiguration(internalJobSpec, options.namespace, options.labels) - if err != nil { - return fmt.Errorf("unable to build container configuration from pod spec: %w", err) - } - containerCfg.ContainerName = buildContainerName(options.containerName, options.namespace) - - existingContainer, err := adapter.getContainer(ctx, containerCfg.ContainerName) - if err != nil { - return fmt.Errorf("unable to inspect container: %w", err) - } - - if existingContainer != nil { - if options.lastAppliedConfiguration == existingContainer.Config.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] { - adapter.logger.Infof("container with the name %s already exists with the same configuration. The update will be skipped", containerCfg.ContainerName) - return nil - } - - adapter.logger.Infof("container with the name %s already exists with a different configuration. The container will be recreated", containerCfg.ContainerName) - - if existingContainer.Config.Labels[k2dtypes.ServiceLastAppliedConfigLabelKey] != "" { - options.labels[k2dtypes.ServiceLastAppliedConfigLabelKey] = existingContainer.Config.Labels[k2dtypes.ServiceLastAppliedConfigLabelKey] - } - - err := adapter.cli.ContainerRemove(ctx, existingContainer.ID, types.ContainerRemoveOptions{Force: true}) - if err != nil { - return fmt.Errorf("unable to remove container: %w", err) - } - } - - registryAuth, err := adapter.getRegistryCredentials(options.podSpec, options.namespace, containerCfg.ContainerConfig.Image) - if err != nil { - return fmt.Errorf("unable to get registry credentials: %w", err) - } - - out, err := adapter.cli.ImagePull(ctx, containerCfg.ContainerConfig.Image, types.ImagePullOptions{ - RegistryAuth: registryAuth, - }) - if err != nil { - return fmt.Errorf("unable to pull %s image: %w", containerCfg.ContainerConfig.Image, err) - } - defer out.Close() - - io.Copy(os.Stdout, out) - - containerCreateResponse, err := adapter.cli.ContainerCreate(ctx, - containerCfg.ContainerConfig, - containerCfg.HostConfig, - containerCfg.NetworkConfig, - nil, - containerCfg.ContainerName, - ) - if err != nil { - return fmt.Errorf("unable to create container: %w", err) - } - - return adapter.cli.ContainerStart(ctx, containerCreateResponse.ID, types.ContainerStartOptions{}) -} - // DeleteContainer attempts to remove a Docker container based on its name and associated namespace. // The container name is fully qualified by appending the namespace to it using the buildContainerName function. // This function forcefully removes the container, regardless of whether it is running or not. diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index 6280d55..89f617e 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -1,12 +1 @@ package converter - -import ( - "github.com/docker/docker/api/types" - "k8s.io/kubernetes/pkg/apis/batch" -) - -// ConvertContainerToPod converts a given Docker container into a Kubernetes Pod object. -// TODO: needs full description -func (converter *DockerAPIConverter) ConvertContainerToJob(container types.Container) batch.Job { - panic("unimplemented") -} diff --git a/internal/adapter/converter/pod.go b/internal/adapter/converter/pod.go index 78648cd..af6b2e1 100644 --- a/internal/adapter/converter/pod.go +++ b/internal/adapter/converter/pod.go @@ -14,7 +14,6 @@ import ( "github.com/portainer/k2d/internal/adapter/naming" k2dtypes "github.com/portainer/k2d/internal/adapter/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/core" ) @@ -200,59 +199,6 @@ func (converter *DockerAPIConverter) ConvertPodSpecToContainerConfiguration(spec }, nil } -// ConvertJobSpecToContainerConfiguration converts a Kubernetes JobSpec into a Docker ContainerConfiguration. -func (converter *DockerAPIConverter) ConvertJobSpecToContainerConfiguration(spec batch.JobSpec, namespace string, labels map[string]string) (ContainerConfiguration, error) { - podSpec := spec.Template.Spec - containerSpec := podSpec.Containers[0] - - containerConfig := &container.Config{ - Image: containerSpec.Image, - Labels: labels, - Env: []string{ - fmt.Sprintf("KUBERNETES_SERVICE_HOST=%s", converter.k2dServerConfiguration.ServerIpAddr), - fmt.Sprintf("KUBERNETES_SERVICE_PORT=%d", converter.k2dServerConfiguration.ServerPort), - }, - } - - hostConfig := &container.HostConfig{ - ExtraHosts: []string{ - fmt.Sprintf("kubernetes.default.svc:%s", converter.k2dServerConfiguration.ServerIpAddr), - }, - } - - if err := converter.setServiceAccountTokenAndCACert(hostConfig); err != nil { - return ContainerConfiguration{}, err - } - - if err := converter.setHostPorts(containerConfig, hostConfig, containerSpec.Ports); err != nil { - return ContainerConfiguration{}, err - } - - if err := converter.setEnvVars(namespace, containerConfig, containerSpec.Env, containerSpec.EnvFrom); err != nil { - return ContainerConfiguration{}, err - } - - setCommandAndArgs(containerConfig, containerSpec.Command, containerSpec.Args) - setRestartPolicy(hostConfig, podSpec.RestartPolicy) - setSecurityContext(containerConfig, hostConfig, podSpec.SecurityContext, containerSpec.SecurityContext) - converter.setResourceRequirements(hostConfig, containerSpec.Resources) - - if err := converter.setVolumeMounts(namespace, hostConfig, podSpec.Volumes, containerSpec.VolumeMounts); err != nil { - return ContainerConfiguration{}, err - } - - networkName := labels[k2dtypes.NetworkNameLabelKey] - return ContainerConfiguration{ - ContainerConfig: containerConfig, - HostConfig: hostConfig, - NetworkConfig: &network.NetworkingConfig{ - EndpointsConfig: map[string]*network.EndpointSettings{ - networkName: {}, - }, - }, - }, nil -} - // setResourceRequirements configures the Docker container's resource constraints based on the provided core.ResourceRequirements. // It receives a Docker HostConfig and a Kubernetes ResourceRequirements. func (converter *DockerAPIConverter) setResourceRequirements(hostConfig *container.HostConfig, resources core.ResourceRequirements) { diff --git a/internal/adapter/job.go b/internal/adapter/job.go index 8688a66..6f5edb3 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -2,14 +2,12 @@ package adapter import ( "context" - "encoding/json" "fmt" "io" "github.com/docker/docker/api/types" "github.com/portainer/k2d/internal/adapter/errors" "github.com/portainer/k2d/internal/adapter/filters" - k2dtypes "github.com/portainer/k2d/internal/adapter/types" "github.com/portainer/k2d/internal/k8s" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,27 +20,6 @@ type JobLogOptions struct { Tail string } -func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, job *batchv1.Job) error { - opts := ContainerCreationOptions{ - containerName: job.Name, - namespace: job.Namespace, - jobSpec: job.Spec, - labels: job.Labels, - } - - if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { - jobData, err := json.Marshal(job) - if err != nil { - return fmt.Errorf("unable to marshal job: %w", err) - } - job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = string(jobData) - } - - opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] - - return adapter.createContainerFromJobSpec(ctx, opts) -} - // The GetJob implementation is using a filtered list approach as the Docker API provide different response types // when inspecting a container and listing containers. // The logic used to build a job from a container is based on the type returned by the list operation (types.Container) @@ -136,24 +113,6 @@ func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string return versionedJobList, nil } -func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batch.Job, error) { - job := adapter.converter.ConvertContainerToJob(container) - - if container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] != "" { - internalJobSpecData := container.Labels[k2dtypes.JobLastAppliedConfigLabelKey] - jobSpec := batch.JobSpec{} - - err := json.Unmarshal([]byte(internalJobSpecData), &jobSpec) - if err != nil { - return nil, fmt.Errorf("unable to unmarshal job spec: %w", err) - } - - job.Spec = jobSpec - } - - return &job, nil -} - func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batch.JobList, error) { filter := filters.ByNamespace(namespace) containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) From f82cf733576a93c21c42b4a03c10d064e50d6546 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Thu, 7 Sep 2023 20:05:20 +0200 Subject: [PATCH 4/8] refactor: workload pattern Signed-off-by: Mateusz Urbanek --- internal/adapter/container_utils.go | 2 - internal/adapter/converter/job.go | 40 ++++++++ internal/adapter/filters/filters.go | 40 ++++++++ internal/adapter/job.go | 138 ++++++++++++++++++---------- internal/adapter/types/labels.go | 4 + 5 files changed, 171 insertions(+), 53 deletions(-) diff --git a/internal/adapter/container_utils.go b/internal/adapter/container_utils.go index a63eaeb..02bbba6 100644 --- a/internal/adapter/container_utils.go +++ b/internal/adapter/container_utils.go @@ -165,8 +165,6 @@ func (adapter *KubeDockerAdapter) buildContainerConfigurationFromExistingContain // This is used to ensure that the container is created in the correct network. // - podSpec: Holds the corev1.PodSpec object representing the desired state of the associated Pod. // This includes configurations like the container image, environment variables, and volume mounts. -// - jobSpec: Holds the batchv1.PodSpec object representing the desired state of the associated Job. -// This includes configurations like the container image, environment variables, and volume mounts. type ContainerCreationOptions struct { containerName string labels map[string]string diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index 89f617e..99b8a31 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -1 +1,41 @@ package converter + +import ( + "time" + + "github.com/docker/docker/api/types" + k2dtypes "github.com/portainer/k2d/internal/adapter/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" +) + +func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, container types.Container) { + job.TypeMeta = metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + } + + job.ObjectMeta.CreationTimestamp = metav1.NewTime(time.Unix(container.Created, 0)) + if job.ObjectMeta.Annotations == nil { + job.ObjectMeta.Annotations = make(map[string]string) + } + + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] + + // containerState := container.State + + // // if the number of replicas isn't set in the job, set it to 1 + // if job.Spec.Replicas == 0 { + // job.Spec.Replicas = 1 + // } + + // job.Status.Replicas = 1 + + // if containerState == "running" { + // job.Status.UpdatedReplicas = 1 + // job.Status.ReadyReplicas = 1 + // job.Status.AvailableReplicas = 1 + // } else { + // job.Status.UnavailableReplicas = 1 + // } +} diff --git a/internal/adapter/filters/filters.go b/internal/adapter/filters/filters.go index 12d66cc..43af0f7 100644 --- a/internal/adapter/filters/filters.go +++ b/internal/adapter/filters/filters.go @@ -26,6 +26,26 @@ func AllDeployments(namespace string) filters.Args { return filter } +// AllJobs creates a Docker filter argument for Kubernetes Jobs within a given namespace. +// The function filters Docker resources based on the Workload and Namespace labels, specifically for Jobs. +// +// Parameters: +// - namespace: The Kubernetes namespace to filter by. +// +// Returns: +// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Workload type labels. +// +// Usage Example: +// +// filter := AllJobs("default") +// // Now 'filter' can be used in Docker API calls to filter Job resources in the 'default' Kubernetes namespace. +func AllJobs(namespace string) filters.Args { + filter := filters.NewArgs() + filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadLabelKey, types.JobWorkloadType)) + filter.Add("label", fmt.Sprintf("%s=%s", types.NamespaceLabelKey, namespace)) + return filter +} + // AllNamespaces creates a Docker filter argument that targets resources labeled with a Kubernetes namespace. // This function uses the types.NamespaceLabelKey constant as the base label key to filter Docker resources. // @@ -81,6 +101,26 @@ func ByDeployment(namespace, deploymentName string) filters.Args { return filter } +// ByJob creates a Docker filter argument for a specific Kubernetes Job within a given namespace. +// The function builds upon the JobsFilter by further narrowing down the filter to match a specific Job name. +// +// Parameters: +// - namespace: The Kubernetes namespace to filter by. +// - jobName: The name of the specific Kubernetes Job to filter by. +// +// Returns: +// - filters.Args: A Docker filter object that can be used to filter Docker API calls based on the namespace and Job name labels. +// +// Usage Example: +// +// filter := ByJob("default", "my-job") +// // Now 'filter' can be used in Docker API calls to filter resources in the 'default' Kubernetes namespace that are part of 'my-job'. +func ByJob(namespace, jobName string) filters.Args { + filter := AllJobs(namespace) + filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadNameLabelKey, jobName)) + return filter +} + // ByNamespace creates a Docker filter argument to target all Docker resources within a specific Kubernetes namespace. // If an empty string is provided, it will return a filter that targets all namespaces. // diff --git a/internal/adapter/job.go b/internal/adapter/job.go index 6f5edb3..c73569d 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -2,52 +2,77 @@ package adapter import ( "context" + "encoding/json" "fmt" - "io" "github.com/docker/docker/api/types" - "github.com/portainer/k2d/internal/adapter/errors" + adaptererr "github.com/portainer/k2d/internal/adapter/errors" "github.com/portainer/k2d/internal/adapter/filters" + k2dtypes "github.com/portainer/k2d/internal/adapter/types" "github.com/portainer/k2d/internal/k8s" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/apis/batch" ) -type JobLogOptions struct { - Timestamps bool - Follow bool - Tail string +func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, job *batchv1.Job) error { + opts := ContainerCreationOptions{ + containerName: job.Name, + namespace: job.Namespace, + podSpec: job.Spec.Template.Spec, + labels: job.Spec.Template.Labels, + } + + opts.labels[k2dtypes.WorkloadLabelKey] = k2dtypes.JobWorkloadType + + if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("unable to marshal job: %w", err) + } + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = string(jobData) + } + + // kubectl create job does not pass the last-applied-configuration annotation + // so we need to add it manually + if job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] == "" { + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("unable to marshal job: %w", err) + } + opts.labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] = string(jobData) + } + + opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] + + return adapter.createContainerFromPodSpec(ctx, opts) } -// The GetJob implementation is using a filtered list approach as the Docker API provide different response types -// when inspecting a container and listing containers. -// The logic used to build a job from a container is based on the type returned by the list operation (types.Container) -// and not the inspect operation (types.ContainerJSON). -// This is because using the inspect operation everywhere would be more expensive overall. -func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, namespace string) (*batchv1.Job, error) { - filter := filters.ByPod(namespace, jobName) // NOTE: I am not sure about this, should work? +func (adapter *KubeDockerAdapter) getContainerFromJobName(ctx context.Context, jobName, namespace string) (types.Container, error) { + filter := filters.ByJob(namespace, jobName) containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) if err != nil { - return nil, fmt.Errorf("unable to list containers: %w", err) + return types.Container{}, fmt.Errorf("unable to list containers: %w", err) } - var container *types.Container + if len(containers) == 0 { + return types.Container{}, adaptererr.ErrResourceNotFound + } - containerName := buildContainerName(jobName, namespace) - for _, cntr := range containers { - if cntr.Names[0] == "/"+containerName { - container = &cntr - break - } + if len(containers) > 1 { + return types.Container{}, fmt.Errorf("multiple containers were found with the associated job name %s", jobName) } - if container == nil { - adapter.logger.Errorf("unable to find container for job %s in namespace %s", jobName, namespace) - return nil, errors.ErrResourceNotFound + return containers[0], nil +} + +func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, namespace string) (*batchv1.Job, error) { + container, err := adapter.getContainerFromJobName(ctx, jobName, namespace) + if err != nil { + return nil, fmt.Errorf("unable to get container from job name: %w", err) } - job, err := adapter.buildJobFromContainer(*container) + job, err := adapter.buildJobFromContainer(container) if err != nil { return nil, fmt.Errorf("unable to get job: %w", err) } @@ -55,7 +80,7 @@ func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, na versionedJob := batchv1.Job{ TypeMeta: metav1.TypeMeta{ Kind: "Job", - APIVersion: "v1", + APIVersion: "batch/v1", }, } @@ -67,22 +92,6 @@ func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, na return &versionedJob, nil } -func (adapter *KubeDockerAdapter) GetJobLogs(ctx context.Context, namespace string, jobName string, opts JobLogOptions) (io.ReadCloser, error) { - containerName := buildContainerName(jobName, namespace) - container, err := adapter.cli.ContainerInspect(ctx, containerName) - if err != nil { - return nil, fmt.Errorf("unable to inspect container: %w", err) - } - - return adapter.cli.ContainerLogs(ctx, container.ID, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Timestamps: opts.Timestamps, - Follow: opts.Follow, - Tail: opts.Tail, - }) -} - func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace string) (*metav1.Table, error) { jobList, err := adapter.listJobs(ctx, namespace) if err != nil { @@ -92,29 +101,54 @@ func (adapter *KubeDockerAdapter) GetJobTable(ctx context.Context, namespace str return k8s.GenerateTable(&jobList) } -func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batch.JobList, error) { +func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string) (batchv1.JobList, error) { jobList, err := adapter.listJobs(ctx, namespace) if err != nil { - return batch.JobList{}, fmt.Errorf("unable to list jobs: %w", err) + return batchv1.JobList{}, fmt.Errorf("unable to list jobs: %w", err) } - versionedJobList := batch.JobList{ + versionedJobList := batchv1.JobList{ TypeMeta: metav1.TypeMeta{ Kind: "JobList", - APIVersion: "v1", + APIVersion: "batch/v1", }, } err = adapter.ConvertK8SResource(&jobList, &versionedJobList) if err != nil { - return batch.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) + return batchv1.JobList{}, fmt.Errorf("unable to convert internal JobList to versioned JobList: %w", err) } return versionedJobList, nil } +func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batch.Job, error) { + if container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] == "" { + return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.WorkloadLastAppliedConfigLabelKey, container.Names[0]) + } + + jobData := container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] + + versionedJob := batchv1.Job{} + + err := json.Unmarshal([]byte(jobData), &versionedJob) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal versioned job: %w", err) + } + + job := batch.Job{} + err = adapter.ConvertK8SResource(&versionedJob, &job) + if err != nil { + return nil, fmt.Errorf("unable to convert versioned job spec to internal job spec: %w", err) + } + + adapter.converter.UpdateJobFromContainerInfo(&job, container) + + return &job, nil +} + func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string) (batch.JobList, error) { - filter := filters.ByNamespace(namespace) + filter := filters.AllJobs(namespace) containers, err := adapter.cli.ContainerList(ctx, types.ContainerListOptions{All: true, Filters: filter}) if err != nil { return batch.JobList{}, fmt.Errorf("unable to list containers: %w", err) @@ -125,16 +159,18 @@ func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string for _, container := range containers { job, err := adapter.buildJobFromContainer(container) if err != nil { - return batch.JobList{}, fmt.Errorf("unable to get jobs: %w", err) + return batch.JobList{}, fmt.Errorf("unable to get job: %w", err) } - jobs = append(jobs, *job) + if job != nil { + jobs = append(jobs, *job) + } } return batch.JobList{ TypeMeta: metav1.TypeMeta{ Kind: "JobList", - APIVersion: "v1", + APIVersion: "batch/v1", }, Items: jobs, }, nil diff --git a/internal/adapter/types/labels.go b/internal/adapter/types/labels.go index 64b619b..b9531d6 100644 --- a/internal/adapter/types/labels.go +++ b/internal/adapter/types/labels.go @@ -61,4 +61,8 @@ const ( // DeploymentWorkloadType is the label value used to identify a Deployment workload // It is stored on a container as a label and used to filter containers when listing deployments DeploymentWorkloadType = "deployment" + + // JobWorkloadType is the label value used to identify a Job workload + // It is stored on a container as a label and used to filter containers when listing deployments + JobWorkloadType = "job" ) From 2b01c553f95567358160458a000628280ac9e020 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Fri, 8 Sep 2023 11:35:03 +0200 Subject: [PATCH 5/8] fix(batch/job): partial starting and listing Signed-off-by: Mateusz Urbanek --- Makefile | 2 +- internal/adapter/converter/converter.go | 2 ++ internal/adapter/converter/job.go | 32 ++++++++++++------------- internal/adapter/converter/pod.go | 2 ++ internal/adapter/job.go | 4 ++++ internal/controller/controller.go | 13 ++++++++++ 6 files changed, 38 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index dcbb82f..fe4dbc8 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # go-workbench Makefile template v1.1.0 # For a list of valid GOOS and GOARCH values, see: https://gist.github.com/asukakenji/f15ba7e588ac42795f421b48b8aede63 # Note: these can be overriden on the command line e.g. `make PLATFORM= ARCH=` -PLATFORM="linux" +PLATFORM="$(shell go env GOOS)" ARCH="$(shell go env GOARCH)" ARM="" VERSION="latest" diff --git a/internal/adapter/converter/converter.go b/internal/adapter/converter/converter.go index fa11e37..fa74d64 100644 --- a/internal/adapter/converter/converter.go +++ b/internal/adapter/converter/converter.go @@ -2,6 +2,7 @@ package converter import ( + containertypes "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/portainer/k2d/internal/adapter/store" @@ -22,6 +23,7 @@ type DockerAPIConverter struct { // ContainerConfiguration is a wrapper around the Docker API container configuration type ContainerConfiguration struct { ContainerName string + ContainerState *containertypes.ContainerState ContainerConfig *container.Config HostConfig *container.HostConfig NetworkConfig *network.NetworkingConfig diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index 99b8a31..d92911c 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -22,20 +22,20 @@ func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] - // containerState := container.State - - // // if the number of replicas isn't set in the job, set it to 1 - // if job.Spec.Replicas == 0 { - // job.Spec.Replicas = 1 - // } - - // job.Status.Replicas = 1 - - // if containerState == "running" { - // job.Status.UpdatedReplicas = 1 - // job.Status.ReadyReplicas = 1 - // job.Status.AvailableReplicas = 1 - // } else { - // job.Status.UnavailableReplicas = 1 - // } + containerState := container.State + + job.Status.Active = 0 + + if containerState == "running" { + job.Status.Active = 1 + } else { + // TODO: handle completion status? + job.Status.Failed = 1 + } + + // TODO: handle duration? + // /containers//json ? This will allow getting: + // - State.ExitCode + // - State.StartedAt + // - State.FinishedAt } diff --git a/internal/adapter/converter/pod.go b/internal/adapter/converter/pod.go index af6b2e1..f7977e2 100644 --- a/internal/adapter/converter/pod.go +++ b/internal/adapter/converter/pod.go @@ -110,6 +110,8 @@ func (converter *DockerAPIConverter) ConvertContainerToPod(container types.Conta }, } } else { + // TODO: handle exited containers, so Jobs can determine the status + // /containers//json ? This will allow getting Exit Code. pod.Status.Phase = core.PodUnknown // this is to mark the pod's condition as unknown diff --git a/internal/adapter/job.go b/internal/adapter/job.go index c73569d..4c4b10e 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -23,6 +23,10 @@ func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, jo labels: job.Spec.Template.Labels, } + if opts.labels == nil { + opts.labels = make(map[string]string) + } + opts.labels[k2dtypes.WorkloadLabelKey] = k2dtypes.JobWorkloadType if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { diff --git a/internal/controller/controller.go b/internal/controller/controller.go index bc4d903..687f85c 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" ) @@ -192,6 +193,13 @@ func (controller *OperationController) processOperation(op Operation) { "request_id", op.RequestID, ) } + case *batchv1.Job: + err := controller.createJob(op) + if err != nil { + controller.logger.Errorw("unable to create job", + "error", err, + ) + } case *corev1.ConfigMap: err := controller.createConfigMap(op) if err != nil { @@ -240,6 +248,11 @@ func (controller *OperationController) createDeployment(op Operation) error { return controller.adapter.CreateContainerFromDeployment(context.TODO(), deployment) } +func (controller *OperationController) createJob(op Operation) error { + job := op.Operation.(*batchv1.Job) + return controller.adapter.CreateContainerFromJob(context.TODO(), job) +} + func (controller *OperationController) createService(op Operation) error { service := op.Operation.(*corev1.Service) return controller.adapter.CreateContainerFromService(context.TODO(), service) From e36c9ec7fe771618f3a360eeaea7e281403625dd Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Sun, 17 Sep 2023 15:57:58 +0200 Subject: [PATCH 6/8] fix: added inspect Signed-off-by: Mateusz Urbanek --- internal/adapter/converter/converter.go | 2 -- internal/adapter/converter/job.go | 10 ++++++++-- internal/adapter/job.go | 13 +++++++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/adapter/converter/converter.go b/internal/adapter/converter/converter.go index fa74d64..fa11e37 100644 --- a/internal/adapter/converter/converter.go +++ b/internal/adapter/converter/converter.go @@ -2,7 +2,6 @@ package converter import ( - containertypes "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/portainer/k2d/internal/adapter/store" @@ -23,7 +22,6 @@ type DockerAPIConverter struct { // ContainerConfiguration is a wrapper around the Docker API container configuration type ContainerConfiguration struct { ContainerName string - ContainerState *containertypes.ContainerState ContainerConfig *container.Config HostConfig *container.HostConfig NetworkConfig *network.NetworkingConfig diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index d92911c..494f54f 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -9,7 +9,7 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" ) -func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, container types.Container) { +func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, container types.Container, json types.ContainerJSON) { job.TypeMeta = metav1.TypeMeta{ Kind: "Job", APIVersion: "batch/v1", @@ -30,7 +30,12 @@ func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, job.Status.Active = 1 } else { // TODO: handle completion status? - job.Status.Failed = 1 + if json.State.ExitCode == 0 { + job.Status.Succeeded = 1 + } else { + job.Status.Failed = 1 + } + } // TODO: handle duration? @@ -38,4 +43,5 @@ func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, // - State.ExitCode // - State.StartedAt // - State.FinishedAt + job.Status.CompletionTime.Time, _ = time.Parse("", json.State.FinishedAt) } diff --git a/internal/adapter/job.go b/internal/adapter/job.go index 4c4b10e..73fee92 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -76,7 +76,7 @@ func (adapter *KubeDockerAdapter) GetJob(ctx context.Context, jobName string, na return nil, fmt.Errorf("unable to get container from job name: %w", err) } - job, err := adapter.buildJobFromContainer(container) + job, err := adapter.buildJobFromContainer(ctx, container) if err != nil { return nil, fmt.Errorf("unable to get job: %w", err) } @@ -126,7 +126,7 @@ func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string return versionedJobList, nil } -func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Container) (*batch.Job, error) { +func (adapter *KubeDockerAdapter) buildJobFromContainer(ctx context.Context, container types.Container) (*batch.Job, error) { if container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] == "" { return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.WorkloadLastAppliedConfigLabelKey, container.Names[0]) } @@ -146,7 +146,12 @@ func (adapter *KubeDockerAdapter) buildJobFromContainer(container types.Containe return nil, fmt.Errorf("unable to convert versioned job spec to internal job spec: %w", err) } - adapter.converter.UpdateJobFromContainerInfo(&job, container) + containerInspect, err := adapter.cli.ContainerInspect(ctx, container.ID) + if err != nil { + return nil, fmt.Errorf("unable to inspect the container: %w", err) + } + + adapter.converter.UpdateJobFromContainerInfo(&job, container, containerInspect) return &job, nil } @@ -161,7 +166,7 @@ func (adapter *KubeDockerAdapter) listJobs(ctx context.Context, namespace string jobs := []batch.Job{} for _, container := range containers { - job, err := adapter.buildJobFromContainer(container) + job, err := adapter.buildJobFromContainer(ctx, container) if err != nil { return batch.JobList{}, fmt.Errorf("unable to get job: %w", err) } From b56299dc12f72aa47a3b1e2be253b9e2351b96d4 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Sat, 30 Sep 2023 00:22:04 +0200 Subject: [PATCH 7/8] fix: rebase refactors Signed-off-by: Mateusz Urbanek --- internal/adapter/converter/job.go | 2 +- internal/adapter/filters/filters.go | 4 ++-- internal/adapter/job.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index 494f54f..b4ce213 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -20,7 +20,7 @@ func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, job.ObjectMeta.Annotations = make(map[string]string) } - job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] + job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] = container.Labels[k2dtypes.LastAppliedConfigLabelKey] containerState := container.State diff --git a/internal/adapter/filters/filters.go b/internal/adapter/filters/filters.go index 43af0f7..2e07f68 100644 --- a/internal/adapter/filters/filters.go +++ b/internal/adapter/filters/filters.go @@ -41,8 +41,8 @@ func AllDeployments(namespace string) filters.Args { // // Now 'filter' can be used in Docker API calls to filter Job resources in the 'default' Kubernetes namespace. func AllJobs(namespace string) filters.Args { filter := filters.NewArgs() - filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadLabelKey, types.JobWorkloadType)) - filter.Add("label", fmt.Sprintf("%s=%s", types.NamespaceLabelKey, namespace)) + filter.Add("label", fmt.Sprintf("%s=%s", types.WorkloadTypeLabelKey, types.JobWorkloadType)) + filter.Add("label", fmt.Sprintf("%s=%s", types.NamespaceNameLabelKey, namespace)) return filter } diff --git a/internal/adapter/job.go b/internal/adapter/job.go index 73fee92..5ad258d 100644 --- a/internal/adapter/job.go +++ b/internal/adapter/job.go @@ -27,7 +27,7 @@ func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, jo opts.labels = make(map[string]string) } - opts.labels[k2dtypes.WorkloadLabelKey] = k2dtypes.JobWorkloadType + opts.labels[k2dtypes.WorkloadTypeLabelKey] = k2dtypes.JobWorkloadType if job.Labels["app.kubernetes.io/managed-by"] == "Helm" { jobData, err := json.Marshal(job) @@ -44,7 +44,7 @@ func (adapter *KubeDockerAdapter) CreateContainerFromJob(ctx context.Context, jo if err != nil { return fmt.Errorf("unable to marshal job: %w", err) } - opts.labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] = string(jobData) + opts.labels[k2dtypes.LastAppliedConfigLabelKey] = string(jobData) } opts.lastAppliedConfiguration = job.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] @@ -127,11 +127,11 @@ func (adapter *KubeDockerAdapter) ListJobs(ctx context.Context, namespace string } func (adapter *KubeDockerAdapter) buildJobFromContainer(ctx context.Context, container types.Container) (*batch.Job, error) { - if container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] == "" { - return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.WorkloadLastAppliedConfigLabelKey, container.Names[0]) + if container.Labels[k2dtypes.LastAppliedConfigLabelKey] == "" { + return nil, fmt.Errorf("unable to build job, missing %s label on container %s", k2dtypes.LastAppliedConfigLabelKey, container.Names[0]) } - jobData := container.Labels[k2dtypes.WorkloadLastAppliedConfigLabelKey] + jobData := container.Labels[k2dtypes.LastAppliedConfigLabelKey] versionedJob := batchv1.Job{} From e4d2b0341204e2574e5a3e20b61845b7ee911da1 Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Sat, 30 Sep 2023 01:37:37 +0200 Subject: [PATCH 8/8] feat: added status time --- internal/adapter/adapter.go | 4 ++++ internal/adapter/converter/job.go | 25 ++++++++++++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/adapter/adapter.go b/internal/adapter/adapter.go index 0c277c7..898f1c7 100644 --- a/internal/adapter/adapter.go +++ b/internal/adapter/adapter.go @@ -18,6 +18,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/apis/apps" appsv1 "k8s.io/kubernetes/pkg/apis/apps/v1" + "k8s.io/kubernetes/pkg/apis/batch" + batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" "k8s.io/kubernetes/pkg/apis/core" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/apis/storage" @@ -196,6 +198,8 @@ func initConversionScheme() *runtime.Scheme { apps.AddToScheme(scheme) appsv1.AddToScheme(scheme) + batch.AddToScheme(scheme) + batchv1.AddToScheme(scheme) core.AddToScheme(scheme) corev1.AddToScheme(scheme) storage.AddToScheme(scheme) diff --git a/internal/adapter/converter/job.go b/internal/adapter/converter/job.go index b4ce213..7eefb49 100644 --- a/internal/adapter/converter/job.go +++ b/internal/adapter/converter/job.go @@ -24,24 +24,31 @@ func (converter *DockerAPIConverter) UpdateJobFromContainerInfo(job *batch.Job, containerState := container.State + startTime, _ := time.Parse(time.RFC3339Nano, json.State.StartedAt) + + metaStartTime := &metav1.Time{ + Time: startTime, + } + + job.Status.StartTime = metaStartTime + job.Status.Active = 0 if containerState == "running" { job.Status.Active = 1 } else { - // TODO: handle completion status? if json.State.ExitCode == 0 { job.Status.Succeeded = 1 + + completionTime, _ := time.Parse(time.RFC3339Nano, json.State.FinishedAt) + + metaCompletionTime := &metav1.Time{ + Time: completionTime, + } + + job.Status.CompletionTime = metaCompletionTime } else { job.Status.Failed = 1 } - } - - // TODO: handle duration? - // /containers//json ? This will allow getting: - // - State.ExitCode - // - State.StartedAt - // - State.FinishedAt - job.Status.CompletionTime.Time, _ = time.Parse("", json.State.FinishedAt) }