From 1793c945bdf864e598ee006caf57bba27495a415 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Thu, 14 Nov 2024 12:28:08 +0000 Subject: [PATCH] Scheduler: refactor floating resources to use internaltypes Signed-off-by: Robert Smith --- .../floating_resource_types.go | 92 +++++++++++-------- .../floating_resource_types_test.go | 82 ++++++++++++++--- .../scheduler/internaltypes/resource_list.go | 20 ++-- .../internaltypes/resource_list_test.go | 17 ++-- internal/scheduler/metrics.go | 2 +- internal/scheduler/metrics/cycle_metrics.go | 2 +- .../preempting_queue_scheduler_test.go | 4 +- .../scheduler/scheduling/scheduling_algo.go | 6 +- 8 files changed, 151 insertions(+), 74 deletions(-) diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go index 3b023e0c891..e56dea92553 100644 --- a/internal/scheduler/floatingresources/floating_resource_types.go +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -10,55 +10,55 @@ import ( "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type FloatingResourceTypes struct { - zeroFloatingResources schedulerobjects.ResourceList - pools map[string]*floatingResourcePool - rlFactory *internaltypes.ResourceListFactory + floatingResourceLimitsByPool map[string]internaltypes.ResourceList } -type floatingResourcePool struct { - totalResources schedulerobjects.ResourceList +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { + err := validate(config) + if err != nil { + return nil, err + } + + floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{} + for _, fr := range config { + for _, poolConfig := range fr.Pools { + floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add( + rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}), + ) + } + } + + return &FloatingResourceTypes{ + floatingResourceLimitsByPool: floatingResourceLimitsByPool, + }, nil } -func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { - zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} +func validate(config []configuration.FloatingResourceConfig) error { + floatingResourceNamesSeen := map[string]bool{} for _, c := range config { - if _, exists := zeroFloatingResources.Resources[c.Name]; exists { - return nil, fmt.Errorf("duplicate floating resource %s", c.Name) + if _, exists := floatingResourceNamesSeen[c.Name]; exists { + return fmt.Errorf("duplicate floating resource %s", c.Name) } - zeroFloatingResources.Resources[c.Name] = resource.Quantity{} + floatingResourceNamesSeen[c.Name] = true } - pools := map[string]*floatingResourcePool{} for _, fr := range config { + poolNamesSeen := map[string]bool{} for _, poolConfig := range fr.Pools { - pool, exists := pools[poolConfig.Name] - if !exists { - pool = &floatingResourcePool{ - totalResources: zeroFloatingResources.DeepCopy(), - } - pools[poolConfig.Name] = pool - } - existing := pool.totalResources.Resources[fr.Name] - if existing.Cmp(resource.Quantity{}) != 0 { - return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name) + if _, exists := poolNamesSeen[poolConfig.Name]; exists { + return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name) } - pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy() + poolNamesSeen[poolConfig.Name] = true } } - - return &FloatingResourceTypes{ - zeroFloatingResources: zeroFloatingResources, - pools: pools, - rlFactory: rlFactory, - }, nil + return nil } func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) { - available := frt.GetTotalAvailableForPoolInternalTypes(poolName) + available := frt.GetTotalAvailableForPool(poolName) if available.AllZero() { return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) } @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern } func (frt *FloatingResourceTypes) AllPools() []string { - result := maps.Keys(frt.pools) + result := maps.Keys(frt.floatingResourceLimitsByPool) slices.Sort(result) return result } -func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList { - pool, exists := frt.pools[poolName] - if !exists { - return frt.zeroFloatingResources.DeepCopy() +func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList { + limits, ok := frt.floatingResourceLimitsByPool[poolName] + if !ok { + return internaltypes.ResourceList{} } - return pool.totalResources.DeepCopy() + return limits } -func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList { - return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources) +func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity { + limits := frt.GetTotalAvailableForPool(poolName) + result := map[string]resource.Quantity{} + for _, res := range limits.GetResources() { + if res.Type != internaltypes.Floating { + continue + } + result[res.Name] = res.Value + } + return result } func (frt *FloatingResourceTypes) SummaryString() string { - if len(frt.zeroFloatingResources.Resources) == 0 { + if len(frt.floatingResourceLimitsByPool) == 0 { return "none" } - return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") + poolSummaries := []string{} + for _, poolName := range frt.AllPools() { + poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName])) + } + return strings.Join(poolSummaries, " ") } diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go index 76ce183b624..4bb5e82fb63 100644 --- a/internal/scheduler/floatingresources/floating_resource_types_test.go +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) { assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) } -func TestGetTotalAvailableForPool(t *testing.T) { - sut := makeSut(t, makeRlFactory()) - zero := resource.Quantity{} - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) +func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "gpu", + Quantity: resource.MustParse("300"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) } -func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) { +func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) +} + +func TestGetTotalAvailableForPool(t *testing.T) { sut := makeSut(t, makeRlFactory()) - cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu") + cpuPool := sut.GetTotalAvailableForPool("cpu") assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2")) - gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu") + gpuPool := sut.GetTotalAvailableForPool("gpu") assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2")) - notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value") - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1")) - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2")) + notFound := sut.GetTotalAvailableForPool("some-invalid-value") + assert.True(t, notFound.IsEmpty()) +} + +func TestGetTotalAvailableForPoolAsMap(t *testing.T) { + sut := makeSut(t, makeRlFactory()) + + cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI), + }, cpuPool) + + gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI), + }, gpuPool) + + notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value") + assert.Equal(t, map[string]resource.Quantity{}, notFound) } func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go index dda39551103..63772bbd8ea 100644 --- a/internal/scheduler/internaltypes/resource_list.go +++ b/internal/scheduler/internaltypes/resource_list.go @@ -24,10 +24,11 @@ type ResourceList struct { } type Resource struct { - Name string - Value int64 - Scale k8sResource.Scale - Type ResourceType + Name string + RawValue int64 + Value k8sResource.Quantity + Scale k8sResource.Scale + Type ResourceType } func (rl ResourceList) Equal(other ResourceList) bool { @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q return k8sResource.Quantity{} } - return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index]) + return *rl.asQuantity(index) } func (rl ResourceList) GetResources() []Resource { @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource { result := make([]Resource, len(rl.resources)) for i, q := range rl.resources { result[i] = Resource{ - Name: rl.factory.indexToName[i], - Value: q, - Scale: rl.factory.scales[i], - Type: rl.factory.types[i], + Name: rl.factory.indexToName[i], + RawValue: q, + Value: *rl.asQuantity(i), + Scale: rl.factory.scales[i], + Type: rl.factory.types[i], } } return result diff --git a/internal/scheduler/internaltypes/resource_list_test.go b/internal/scheduler/internaltypes/resource_list_test.go index 1138b6225b2..39fa9c6c1e2 100644 --- a/internal/scheduler/internaltypes/resource_list_test.go +++ b/internal/scheduler/internaltypes/resource_list_test.go @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) { a := testResourceList(factory, "1", "1Gi") expected := []Resource{ - {Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating}, - {Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating}, + {Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating}, + {Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating}, } + + for i, r := range expected { + expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale) + } + assert.Equal(t, expected, a.GetResources()) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 0c0e7e7752d..af9d932fe64 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -415,7 +415,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } for _, pool := range c.floatingResourceTypes.AllPools() { - totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) + totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)} clusterKey := clusterMetricKey{ cluster: "floating", pool: pool, diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index b6fdfa22a91..c2f32ad60e7 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) for _, r := range s.EvictedResources.GetResources() { - m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value)) + m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) } } } diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index eb0a3bb05ad..7102c63a8ef 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) { for nodeId, node := range result.AffectedNodesById { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId) } } } @@ -2202,7 +2202,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { for node := it.NextNode(); node != nil; node = it.NextNode() { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId()) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId()) } } } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 01ad8e37773..77e98caba89 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule( ctx.Infof("Scheduling on pool %s with capacity %s %s", pool, fsctx.nodeDb.TotalKubernetesResources().String(), - l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(), + l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(), ) start := time.Now() @@ -277,7 +277,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) schedulingContext, err := l.constructSchedulingContext( pool.Name, @@ -528,7 +528,7 @@ func (l *FairSchedulingAlgo) SchedulePool( pool string, ) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { totalResources := fsctx.nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool)) constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues))