Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/k8s_client/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (c *Client) ApplyManifest(
switch result.Operation {
case manifest.OperationCreate:
_, applyErr = c.CreateResource(ctx, newManifest)
if applyErr != nil && apierrors.IsAlreadyExists(applyErr) {
// Resource was created by a concurrent process between our Get and Create.
// Treat as a successful no-op rather than an error.
c.log.Debugf(ctx, "Resource %s/%s already exists (concurrent create), treating as skip", gvk.Kind, name)
result.Operation = manifest.OperationSkip
result.Reason = "already exists (concurrent create)"
applyErr = nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for this change

case manifest.OperationUpdate:
// Preserve resourceVersion and UID from existing for update
Expand Down
24 changes: 0 additions & 24 deletions internal/k8s_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ func (c *Client) CreateResource(ctx context.Context, obj *unstructured.Unstructu
namespace := obj.GetNamespace()
name := obj.GetName()

c.log.Infof(ctx, "Creating resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

err := c.client.Create(ctx, obj)
if err != nil {
if apierrors.IsAlreadyExists(err) {
Expand All @@ -136,15 +134,11 @@ func (c *Client) CreateResource(ctx context.Context, obj *unstructured.Unstructu
Err: err,
}
}

c.log.Infof(ctx, "Successfully created resource: %s/%s", gvk.Kind, name)
return obj, nil
}

// GetResource retrieves a specific Kubernetes resource by GVK, namespace, and name
func (c *Client) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, _ transport_client.TransportContext) (*unstructured.Unstructured, error) {
c.log.Infof(ctx, "Getting resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)

Expand All @@ -168,8 +162,6 @@ func (c *Client) GetResource(ctx context.Context, gvk schema.GroupVersionKind, n
Err: err,
}
}

c.log.Infof(ctx, "Successfully retrieved resource: %s/%s", gvk.Kind, name)
return obj, nil
}

Expand All @@ -182,8 +174,6 @@ func (c *Client) GetResource(ctx context.Context, gvk schema.GroupVersionKind, n
//
// For more flexible discovery (including by-name lookup), use DiscoverResources() instead.
func (c *Client) ListResources(ctx context.Context, gvk schema.GroupVersionKind, namespace string, labelSelector string) (*unstructured.UnstructuredList, error) {
c.log.Infof(ctx, "Listing resources: %s (namespace: %s, labelSelector: %s)", gvk.Kind, namespace, labelSelector)

list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)

Expand Down Expand Up @@ -215,7 +205,6 @@ func (c *Client) ListResources(ctx context.Context, gvk schema.GroupVersionKind,
}
}

c.log.Infof(ctx, "Successfully listed resources: %s (found %d items)", gvk.Kind, len(list.Items))
return list, nil
}

Expand Down Expand Up @@ -245,8 +234,6 @@ func (c *Client) UpdateResource(ctx context.Context, obj *unstructured.Unstructu
namespace := obj.GetNamespace()
name := obj.GetName()

c.log.Infof(ctx, "Updating resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

err := c.client.Update(ctx, obj)
if err != nil {
if apierrors.IsConflict(err) {
Expand All @@ -261,15 +248,11 @@ func (c *Client) UpdateResource(ctx context.Context, obj *unstructured.Unstructu
Err: err,
}
}

c.log.Infof(ctx, "Successfully updated resource: %s/%s", gvk.Kind, name)
return obj, nil
}

// DeleteResource deletes a Kubernetes resource
func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string) error {
c.log.Infof(ctx, "Deleting resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetNamespace(namespace)
Expand All @@ -278,7 +261,6 @@ func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind
err := c.client.Delete(ctx, obj)
if err != nil {
if apierrors.IsNotFound(err) {
c.log.Infof(ctx, "Resource already deleted: %s/%s", gvk.Kind, name)
return nil
}
return &apperrors.K8sOperationError{
Expand All @@ -290,8 +272,6 @@ func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind
Err: err,
}
}

c.log.Infof(ctx, "Successfully deleted resource: %s/%s", gvk.Kind, name)
return nil
}

Expand Down Expand Up @@ -321,8 +301,6 @@ func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind
// patchData := []byte(`{"metadata":{"labels":{"new-label":"value"}}}`)
// patched, err := client.PatchResource(ctx, gvk, "default", "my-cm", patchData)
func (c *Client) PatchResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, patchData []byte) (*unstructured.Unstructured, error) {
c.log.Infof(ctx, "Patching resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

// Parse patch data to validate JSON
var patchObj map[string]interface{}
if err := json.Unmarshal(patchData, &patchObj); err != nil {
Expand Down Expand Up @@ -355,8 +333,6 @@ func (c *Client) PatchResource(ctx context.Context, gvk schema.GroupVersionKind,
}
}

c.log.Infof(ctx, "Successfully patched resource: %s/%s", gvk.Kind, name)

// Get the updated resource to return
return c.GetResource(ctx, gvk, namespace, name, nil)
}
2 changes: 0 additions & 2 deletions internal/maestro_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,6 @@ func (c *Client) ApplyResource(
// Set namespace to consumer name
work.Namespace = consumerName

c.log.Infof(ctx, "Applying ManifestWork %s/%s", consumerName, work.Name)

// Apply the ManifestWork (create or update with generation comparison)
result, err := c.ApplyManifestWork(ctx, consumerName, work)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions internal/maestro_client/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func (c *Client) CreateManifestWork(
ctx = logger.WithLogField(ctx, "manifestwork", work.Name)
ctx = logger.WithObservedGeneration(ctx, manifest.GetGeneration(work.ObjectMeta))

c.log.WithFields(map[string]interface{}{
"manifests": len(work.Spec.Workload.Manifests),
}).Debug(ctx, "Creating ManifestWork")

// Set namespace to consumer name (required by Maestro)
work.Namespace = consumerName

Expand All @@ -68,7 +64,6 @@ func (c *Client) CreateManifestWork(
consumerName, work.Name, err)
}

c.log.Info(ctx, "Created ManifestWork")
return created, nil
}

Expand All @@ -81,8 +76,6 @@ func (c *Client) GetManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Getting ManifestWork")

work, err := c.workClient.ManifestWorks(consumerName).Get(ctx, workName, metav1.GetOptions{})
if err != nil {
// Return not found error without wrapping for callers to check
Expand All @@ -106,8 +99,6 @@ func (c *Client) PatchManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Patching ManifestWork")

patched, err := c.workClient.ManifestWorks(consumerName).Patch(
ctx,
workName,
Expand All @@ -120,7 +111,6 @@ func (c *Client) PatchManifestWork(
consumerName, workName, err)
}

c.log.Info(ctx, "Patched ManifestWork")
return patched, nil
}

Expand All @@ -133,20 +123,16 @@ func (c *Client) DeleteManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Deleting ManifestWork")

err := c.workClient.ManifestWorks(consumerName).Delete(ctx, workName, metav1.DeleteOptions{})
if err != nil {
// Ignore not found errors (already deleted)
if apierrors.IsNotFound(err) {
c.log.Debug(ctx, "ManifestWork already deleted")
return nil
}
return apperrors.MaestroError("failed to delete ManifestWork %s/%s: %v",
consumerName, workName, err)
}

c.log.Info(ctx, "Deleted ManifestWork")
return nil
}

Expand All @@ -158,10 +144,6 @@ func (c *Client) ListManifestWorks(
) (*workv1.ManifestWorkList, error) {
ctx = logger.WithMaestroConsumer(ctx, consumerName)

c.log.WithFields(map[string]interface{}{
"labelSelector": labelSelector,
}).Debug(ctx, "Listing ManifestWorks")

opts := metav1.ListOptions{}
if labelSelector != "" {
opts.LabelSelector = labelSelector
Expand All @@ -173,9 +155,6 @@ func (c *Client) ListManifestWorks(
consumerName, err)
}

c.log.WithFields(map[string]interface{}{
"count": len(list.Items),
}).Debug(ctx, "Listed ManifestWorks")
return list, nil
}

Expand Down Expand Up @@ -217,8 +196,6 @@ func (c *Client) ApplyManifestWork(
ctx = logger.WithLogField(ctx, "manifestwork", manifestWork.Name)
ctx = logger.WithObservedGeneration(ctx, newGeneration)

c.log.Debug(ctx, "Applying ManifestWork")

// Check if ManifestWork exists
existing, err := c.GetManifestWork(ctx, consumerName, manifestWork.Name)
exists := err == nil
Expand Down Expand Up @@ -307,8 +284,6 @@ func (c *Client) DiscoverManifest(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Discovering manifests in ManifestWork")

// Get the ManifestWork
work, err := c.GetManifestWork(ctx, consumerName, workName)
if err != nil {
Expand All @@ -329,10 +304,6 @@ func (c *Client) DiscoverManifest(
consumerName, workName, err)
}

c.log.WithFields(map[string]interface{}{
"found": len(list.Items),
}).Debug(ctx, "Discovered manifests in ManifestWork")

return list, nil
}

Expand Down