Skip to content

Commit

Permalink
Recreate job always if generation has changed
Browse files Browse the repository at this point in the history
Co-authored-by: Corentin Néau <tan.neau@suse.com>
Co-authored-by: Xavi Garcia <xavi.garcia@suse.com>
  • Loading branch information
3 people committed Jan 16, 2024
1 parent bd27fe7 commit fda1f18
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 52 deletions.
51 changes: 49 additions & 2 deletions integrationtests/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,56 @@ var _ = Describe("GitJob controller", func() {
gitJobName = "force-deletion"
})

It("Verifies that the Job is deleted", func() {
It("Verifies that the Job is recreated", func() {
Eventually(func() bool {
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitJobNamespace}, &job))
newJob := &batchv1.Job{}
_ = k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitJobNamespace}, newJob)

return string(job.UID) != string(newJob.UID)
}).Should(BeTrue())
})
})

When("User performs an update in a Job argument", func() {
var (
gitJob v1.GitJob
gitJobName string
job batchv1.Job
jobName string
)

JustBeforeEach(func() {
gitJob = createGitJob(gitJobName)
Expect(k8sClient.Create(ctx, &gitJob)).ToNot(HaveOccurred())
Expect(simulateGitPollerUpdatingCommitInStatus(gitJob, commit)).ToNot(HaveOccurred())
Eventually(func() error {
jobName = name.SafeConcatName(gitJobName, name.Hex(repo+commit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitJobNamespace}, &job)
}).Should(Not(HaveOccurred()))

// change args parameter, this will change the Generation field. This simulates changing fleet apply parameters.
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
var gitJobFomCluster v1.GitJob
err := k8sClient.Get(ctx, types.NamespacedName{Name: gitJob.Name, Namespace: gitJob.Namespace}, &gitJobFomCluster)
if err != nil {
return err
}
gitJobFomCluster.Spec.JobSpec.Template.Spec.Containers[0].Args = []string{"-v"}

return k8sClient.Update(ctx, &gitJobFomCluster)
})).ToNot(HaveOccurred())
})

BeforeEach(func() {
gitJobName = "simulate-arg-update"
})

It("Verifies that the Job is recreated", func() {
Eventually(func() bool {
newJob := &batchv1.Job{}
_ = k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitJobNamespace}, newJob)

return string(job.UID) != string(newJob.UID)
}).Should(BeTrue())
})
})
Expand Down
87 changes: 37 additions & 50 deletions pkg/controller/gitjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if err := r.createJob(ctx, &gitJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating job: %v", err)
}
} else {
if err = r.updateStatus(ctx, &gitJob, &job); err != nil {
return ctrl.Result{}, fmt.Errorf("error updating gitjob status: %v", err)
}
} else if gitJob.Status.Commit != "" {
if err = r.deleteJobIfNeeded(ctx, &gitJob, &job); err != nil {
return ctrl.Result{}, fmt.Errorf("error deleting job: %v", err)
}
}

if err = r.updateStatus(ctx, &gitJob, &job); err != nil {
return ctrl.Result{}, fmt.Errorf("error updating gitjob status: %v", err)
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -162,56 +163,47 @@ func (r *GitJobReconciler) updateStatus(ctx context.Context, gitJob *v1.GitJob,
return err
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var gitJobFomCluster v1.GitJob
gitJob.Status.JobStatus = result.Status.String()
for _, con := range result.Conditions {
condition.Cond(con.Type.String()).SetStatus(gitJob, string(con.Status))
condition.Cond(con.Type.String()).SetMessageIfBlank(gitJob, con.Message)
condition.Cond(con.Type.String()).Reason(gitJob, con.Reason)
}

err := r.Get(ctx, types.NamespacedName{Name: gitJob.Name, Namespace: gitJob.Namespace}, &gitJobFomCluster)
if result.Status == status.FailedStatus {
selector := labels.SelectorFromSet(labels.Set{
"job-name": job.Name,
})
var podList corev1.PodList
err := r.Client.List(ctx, &podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return err
}

gitJobFomCluster.Status.JobStatus = result.Status.String()
for _, con := range result.Conditions {
condition.Cond(con.Type.String()).SetStatus(&gitJobFomCluster, string(con.Status))
condition.Cond(con.Type.String()).SetMessageIfBlank(&gitJobFomCluster, con.Message)
condition.Cond(con.Type.String()).Reason(&gitJobFomCluster, con.Reason)
}

if result.Status == status.FailedStatus {
selector := labels.SelectorFromSet(labels.Set{
"job-name": job.Name,
})
var podList corev1.PodList
err := r.Client.List(ctx, &podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return err
}
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
})
terminationMessage := result.Message
if len(podList.Items) > 0 {
for _, podStatus := range podList.Items[len(podList.Items)-1].Status.ContainerStatuses {
if podStatus.Name != "step-git-source" && podStatus.State.Terminated != nil {
terminationMessage += podStatus.State.Terminated.Message
}
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
})
terminationMessage := result.Message
if len(podList.Items) > 0 {
for _, podStatus := range podList.Items[len(podList.Items)-1].Status.ContainerStatuses {
if podStatus.Name != "step-git-source" && podStatus.State.Terminated != nil {
terminationMessage += podStatus.State.Terminated.Message
}
}
kstatus.SetError(&gitJobFomCluster, terminationMessage)
}
kstatus.SetError(gitJob, terminationMessage)
}

if result.Status == status.CurrentStatus {
if strings.Contains(result.Message, "Job Completed") {
gitJobFomCluster.Status.LastExecutedCommit = job.Annotations["commit"]
}
kstatus.SetActive(&gitJobFomCluster)
if result.Status == status.CurrentStatus {
if strings.Contains(result.Message, "Job Completed") {
gitJob.Status.LastExecutedCommit = job.Annotations["commit"]
}
kstatus.SetActive(gitJob)
}

gitJobFomCluster.Status.ObservedGeneration = gitJobFomCluster.Generation
gitJobFomCluster.Status.LastSyncedTime = metav1.Now()
gitJob.Status.ObservedGeneration = gitJob.Generation
gitJob.Status.LastSyncedTime = metav1.Now()

return r.Status().Update(ctx, &gitJobFomCluster)
})
return r.Status().Update(ctx, gitJob)
}

func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitJob *v1.GitJob, job *batchv1.Job) error {
Expand All @@ -224,8 +216,8 @@ func (r *GitJobReconciler) deleteJobIfNeeded(ctx context.Context, gitJob *v1.Git
}
}

// if the job failed, e.g. because a helm registry was unreachable, delete the old job
if isJobError(gitJob) && gitJob.Generation != gitJob.Status.ObservedGeneration {
// k8s Jobs are immutable. Recreate the job if the GitJob Spec has changed.
if gitJob.Generation != gitJob.Status.ObservedGeneration {
r.Log.Info("job deletion triggered because of generation has changed, and it was in an error state")
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) {
return err
Expand Down Expand Up @@ -400,11 +392,6 @@ func (r *GitJobReconciler) generateInitContainer(ctx context.Context, obj *v1.Gi
}, nil
}

// isJobError returns true if the conditions from kstatus.SetError, used by job controller, are matched
func isJobError(obj *v1.GitJob) bool {
return kstatus.Reconciling.IsFalse(obj) && kstatus.Stalled.IsTrue(obj) && obj.Status.JobStatus == status.FailedStatus.String()
}

func proxyEnvVars() []corev1.EnvVar {
var envVars []corev1.EnvVar
for _, envVar := range []string{"HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY"} {
Expand Down

0 comments on commit fda1f18

Please sign in to comment.