Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: refactor floating resources to use internaltypes #4047

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 52 additions & 40 deletions internal/scheduler/floatingresources/floating_resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,55 @@ import (
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type FloatingResourceTypes struct {
zeroFloatingResources schedulerobjects.ResourceList
pools map[string]*floatingResourcePool
rlFactory *internaltypes.ResourceListFactory
floatingResourceLimitsByPool map[string]internaltypes.ResourceList
}

type floatingResourcePool struct {
totalResources schedulerobjects.ResourceList
func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
err := validate(config)
if err != nil {
return nil, err
}

floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{}
for _, fr := range config {
for _, poolConfig := range fr.Pools {
floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add(
rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}),
)
}
}

return &FloatingResourceTypes{
floatingResourceLimitsByPool: floatingResourceLimitsByPool,
}, nil
}

func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))}
func validate(config []configuration.FloatingResourceConfig) error {
floatingResourceNamesSeen := map[string]bool{}
for _, c := range config {
if _, exists := zeroFloatingResources.Resources[c.Name]; exists {
return nil, fmt.Errorf("duplicate floating resource %s", c.Name)
if _, exists := floatingResourceNamesSeen[c.Name]; exists {
return fmt.Errorf("duplicate floating resource %s", c.Name)
}
zeroFloatingResources.Resources[c.Name] = resource.Quantity{}
floatingResourceNamesSeen[c.Name] = true
}

pools := map[string]*floatingResourcePool{}
for _, fr := range config {
poolNamesSeen := map[string]bool{}
for _, poolConfig := range fr.Pools {
pool, exists := pools[poolConfig.Name]
if !exists {
pool = &floatingResourcePool{
totalResources: zeroFloatingResources.DeepCopy(),
}
pools[poolConfig.Name] = pool
}
existing := pool.totalResources.Resources[fr.Name]
if existing.Cmp(resource.Quantity{}) != 0 {
return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name)
if _, exists := poolNamesSeen[poolConfig.Name]; exists {
return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name)
}
pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy()
poolNamesSeen[poolConfig.Name] = true
}
}

return &FloatingResourceTypes{
zeroFloatingResources: zeroFloatingResources,
pools: pools,
rlFactory: rlFactory,
}, nil
return nil
}

func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) {
available := frt.GetTotalAvailableForPoolInternalTypes(poolName)
available := frt.GetTotalAvailableForPool(poolName)
if available.AllZero() {
return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName)
}
Expand All @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern
}

func (frt *FloatingResourceTypes) AllPools() []string {
result := maps.Keys(frt.pools)
result := maps.Keys(frt.floatingResourceLimitsByPool)
slices.Sort(result)
return result
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList {
pool, exists := frt.pools[poolName]
if !exists {
return frt.zeroFloatingResources.DeepCopy()
func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList {
limits, ok := frt.floatingResourceLimitsByPool[poolName]
if !ok {
return internaltypes.ResourceList{}
}
return pool.totalResources.DeepCopy()
return limits
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList {
return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources)
func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity {
limits := frt.GetTotalAvailableForPool(poolName)
result := map[string]resource.Quantity{}
for _, res := range limits.GetResources() {
if res.Type != internaltypes.Floating {
continue
}
result[res.Name] = res.Value
}
return result
}

func (frt *FloatingResourceTypes) SummaryString() string {
if len(frt.zeroFloatingResources.Resources) == 0 {
if len(frt.floatingResourceLimitsByPool) == 0 {
return "none"
}
return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ")
poolSummaries := []string{}
for _, poolName := range frt.AllPools() {
poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName]))
}
return strings.Join(poolSummaries, " ")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) {
assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools())
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())
zero := resource.Quantity{}
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "gpu",
Quantity: resource.MustParse("300"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) {
func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
}, {
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu")
cpuPool := sut.GetTotalAvailableForPool("cpu")
assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2"))

gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu")
gpuPool := sut.GetTotalAvailableForPool("gpu")
assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2"))

notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value")
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2"))
notFound := sut.GetTotalAvailableForPool("some-invalid-value")
assert.True(t, notFound.IsEmpty())
}

func TestGetTotalAvailableForPoolAsMap(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI),
}, cpuPool)

gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI),
}, gpuPool)

notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value")
assert.Equal(t, map[string]resource.Quantity{}, notFound)
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
Expand Down
20 changes: 11 additions & 9 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type ResourceList struct {
}

type Resource struct {
Name string
Value int64
Scale k8sResource.Scale
Type ResourceType
Name string
RawValue int64
Value k8sResource.Quantity
Scale k8sResource.Scale
Type ResourceType
}

func (rl ResourceList) Equal(other ResourceList) bool {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q
return k8sResource.Quantity{}
}

return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index])
return *rl.asQuantity(index)
}

func (rl ResourceList) GetResources() []Resource {
Expand All @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource {
result := make([]Resource, len(rl.resources))
for i, q := range rl.resources {
result[i] = Resource{
Name: rl.factory.indexToName[i],
Value: q,
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
Name: rl.factory.indexToName[i],
RawValue: q,
Value: *rl.asQuantity(i),
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
}
}
return result
Expand Down
17 changes: 11 additions & 6 deletions internal/scheduler/internaltypes/resource_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) {
a := testResourceList(factory, "1", "1Gi")

expected := []Resource{
{Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating},
{Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating},
{Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating},
{Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating},
}

for i, r := range expected {
expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale)
}

assert.Equal(t, expected, a.GetResources())
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
}

for _, pool := range c.floatingResourceTypes.AllPools() {
totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool)
totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)}
clusterKey := clusterMetricKey{
cluster: "floating",
pool: pool,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)
m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount))

for _, r := range s.EvictedResources.GetResources() {
m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value))
m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) {
for nodeId, node := range result.AffectedNodesById {
for _, p := range priorities {
for _, r := range node.AllocatableByPriority[p].GetResources() {
assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId)
assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId)
}
}
}
Expand Down Expand Up @@ -2202,7 +2202,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
for node := it.NextNode(); node != nil; node = it.NextNode() {
for _, p := range priorities {
for _, r := range node.AllocatableByPriority[p].GetResources() {
assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId())
assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule(
ctx.Infof("Scheduling on pool %s with capacity %s %s",
pool,
fsctx.nodeDb.TotalKubernetesResources().String(),
l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(),
l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(),
)

start := time.Now()
Expand Down Expand Up @@ -277,7 +277,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
}

totalResources := nodeDb.TotalKubernetesResources()
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name))
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name))

schedulingContext, err := l.constructSchedulingContext(
pool.Name,
Expand Down Expand Up @@ -528,7 +528,7 @@ func (l *FairSchedulingAlgo) SchedulePool(
pool string,
) (*SchedulerResult, *schedulercontext.SchedulingContext, error) {
totalResources := fsctx.nodeDb.TotalKubernetesResources()
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool))
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool))

constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues))

Expand Down