diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index f7b9fc4226f..f9f295e8d05 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -503,7 +503,7 @@ func (job *Job) Tolerations() []v1.Toleration { // ResourceRequirements returns the resource requirements of the Job // KubernetesResourceRequirements below is preferred -func (job *Job) ResourceRequirements() v1.ResourceRequirements { +func (job *Job) resourceRequirements() v1.ResourceRequirements { if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } @@ -831,7 +831,7 @@ func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job job.NodeSelector(), job.Affinity(), job.Tolerations(), - job.ResourceRequirements().Requests, + job.resourceRequirements().Requests, job.PriorityClassName(), ) } diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 3e25deb892b..faa10d50901 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -209,7 +209,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio } queue := job.Queue() - requests := job.ResourceRequirements().Requests + requests := job.AllResourceRequirements() latestRun := job.LatestRun() pool := "" node := "" @@ -236,7 +236,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio // Resource Seconds for _, res := range m.trackedResourceNames { - resQty := requests[res] + resQty := requests.GetResourceByNameZeroIfMissing(string(res)) resSeconds := duration * float64(resQty.MilliValue()) / 1000 m.jobStateResourceSecondsByQueue. WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds) diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 1f23597541b..3ace44245c4 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -12,7 +12,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/scheduler/adapters" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -132,8 +131,6 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := job.KubernetesResourceRequirements() - requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests)) - assert.Nil(t, err) jobId := job.Id() @@ -177,14 +174,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, boundNode.AllocatedByJobId, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, evictedNode.AllocatedByJobId, ), ) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index 2b96b335f1b..e92167c3275 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -218,10 +218,10 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche ) resourcesByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []*jobdb.Job) schedulerobjects.ResourceList { - rv := schedulerobjects.NewResourceListWithDefaultSize() + func(jobs []*jobdb.Job) internaltypes.ResourceList { + rv := internaltypes.ResourceList{} for _, job := range jobs { - rv.AddV1ResourceList(job.ResourceRequirements().Requests) + rv = rv.Add(job.AllResourceRequirements()) } return rv }, @@ -247,8 +247,8 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche maps.Keys(jobsByQueue), armadamaps.MapValues( resourcesByQueue, - func(rl schedulerobjects.ResourceList) string { - return rl.CompactString() + func(rl internaltypes.ResourceList) string { + return rl.String() }, ), jobCountPerQueue, diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 7102c63a8ef..f8fdbac4ce8 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -1920,7 +1920,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Accounting across scheduling rounds. roundByJobId := make(map[string]int) indexByJobId := make(map[string]int) - allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) + allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) nodeIdByJobId := make(map[string]string) var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string @@ -1941,7 +1941,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) } - demandByQueue := map[string]schedulerobjects.ResourceList{} + demandByQueue := map[string]internaltypes.ResourceList{} // Run the scheduler. cordonedNodes := map[int]bool{} @@ -1978,12 +1978,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { queuedJobs = append(queuedJobs, job.WithQueued(true)) roundByJobId[job.Id()] = i indexByJobId[job.Id()] = j - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Add(job.AllResourceRequirements()) } } err = jobDbTxn.Upsert(queuedJobs) @@ -2005,12 +2000,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { delete(gangIdByJobId, job.Id()) delete(jobIdsByGangId[gangId], job.Id()) } - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements()) } } } @@ -2049,11 +2039,11 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources) + queueDemand := demandByQueue[queue] err := sctx.AddQueueSchedulingContext( queue, weight, - internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory), + allocatedByQueueAndPriorityClass[queue], queueDemand, queueDemand, limiterByQueue[queue], @@ -2092,28 +2082,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.SubV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Subtract(job.AllResourceRequirements()) } for _, jctx := range result.ScheduledJobs { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.AddV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Add(job.AllResourceRequirements()) } for queue, qctx := range sctx.QueueSchedulingContexts { - m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory) + m := allocatedByQueueAndPriorityClass[queue] assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass)) } diff --git a/internal/scheduler/simulator/sink/job_writer.go b/internal/scheduler/simulator/sink/job_writer.go index ccac68dcace..7f4d029d365 100644 --- a/internal/scheduler/simulator/sink/job_writer.go +++ b/internal/scheduler/simulator/sink/job_writer.go @@ -4,7 +4,6 @@ import ( "os" parquetWriter "github.com/xitongsys/parquet-go/writer" - v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" protoutil "github.com/armadaproject/armada/internal/common/proto" @@ -77,10 +76,10 @@ func (j *JobWriter) createJobRunRow(st *model.StateTransition) ([]*JobRunRow, er associatedJob := jobsList[i] if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + cpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("cpu") + memoryLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("memory") + ephemeralStorageLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("ephemeral-storage") + gpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("nvidia.com/gpu") eventTime := protoutil.ToStdTime(event.Created) rows = append(rows, &JobRunRow{