Skip to content

Commit

Permalink
Merge branch 'master' into preemption_reason_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesMurkin authored Nov 14, 2024
2 parents 3c760f6 + 00c123a commit c5ec732
Show file tree
Hide file tree
Showing 19 changed files with 232 additions and 160 deletions.
2 changes: 1 addition & 1 deletion client/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "armada_client"
version = "0.3.5"
version = "0.4.6"
description = "Armada gRPC API python client"
readme = "README.md"
requires-python = ">=3.7"
Expand Down
10 changes: 5 additions & 5 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions internal/scheduler/database/query/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, vali
SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2;

-- name: UpdateJobPriorityByJobSet :exec
UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3;
UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false;

-- name: MarkJobsCancelRequestedBySetAndQueuedState :exec
UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]);
UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]) and cancelled = false and succeeded = false and failed = false;

-- name: MarkJobsSucceededById :exec
UPDATE jobs SET succeeded = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]);

-- name: MarkJobsCancelRequestedById :exec
UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]);
UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false;

-- name: MarkJobsCancelledById :exec
UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]);
Expand All @@ -29,7 +29,7 @@ UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]);
UPDATE jobs SET failed = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]);

-- name: UpdateJobPriorityById :exec
UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]);
UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false;

-- name: SelectInitialRuns :many
SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial LIMIT $2;
Expand All @@ -44,7 +44,7 @@ SELECT run_id FROM runs;
SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial;

-- name: MarkJobRunsPreemptRequestedByJobId :exec
UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]);
UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false;

-- name: MarkJobRunsSucceededById :exec
UPDATE runs SET succeeded = true WHERE run_id = ANY(sqlc.arg(run_ids)::text[]);
Expand Down
92 changes: 52 additions & 40 deletions internal/scheduler/floatingresources/floating_resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,55 @@ import (
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type FloatingResourceTypes struct {
zeroFloatingResources schedulerobjects.ResourceList
pools map[string]*floatingResourcePool
rlFactory *internaltypes.ResourceListFactory
floatingResourceLimitsByPool map[string]internaltypes.ResourceList
}

type floatingResourcePool struct {
totalResources schedulerobjects.ResourceList
func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
err := validate(config)
if err != nil {
return nil, err
}

floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{}
for _, fr := range config {
for _, poolConfig := range fr.Pools {
floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add(
rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}),
)
}
}

return &FloatingResourceTypes{
floatingResourceLimitsByPool: floatingResourceLimitsByPool,
}, nil
}

func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))}
func validate(config []configuration.FloatingResourceConfig) error {
floatingResourceNamesSeen := map[string]bool{}
for _, c := range config {
if _, exists := zeroFloatingResources.Resources[c.Name]; exists {
return nil, fmt.Errorf("duplicate floating resource %s", c.Name)
if _, exists := floatingResourceNamesSeen[c.Name]; exists {
return fmt.Errorf("duplicate floating resource %s", c.Name)
}
zeroFloatingResources.Resources[c.Name] = resource.Quantity{}
floatingResourceNamesSeen[c.Name] = true
}

pools := map[string]*floatingResourcePool{}
for _, fr := range config {
poolNamesSeen := map[string]bool{}
for _, poolConfig := range fr.Pools {
pool, exists := pools[poolConfig.Name]
if !exists {
pool = &floatingResourcePool{
totalResources: zeroFloatingResources.DeepCopy(),
}
pools[poolConfig.Name] = pool
}
existing := pool.totalResources.Resources[fr.Name]
if existing.Cmp(resource.Quantity{}) != 0 {
return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name)
if _, exists := poolNamesSeen[poolConfig.Name]; exists {
return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name)
}
pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy()
poolNamesSeen[poolConfig.Name] = true
}
}

return &FloatingResourceTypes{
zeroFloatingResources: zeroFloatingResources,
pools: pools,
rlFactory: rlFactory,
}, nil
return nil
}

func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) {
available := frt.GetTotalAvailableForPoolInternalTypes(poolName)
available := frt.GetTotalAvailableForPool(poolName)
if available.AllZero() {
return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName)
}
Expand All @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern
}

func (frt *FloatingResourceTypes) AllPools() []string {
result := maps.Keys(frt.pools)
result := maps.Keys(frt.floatingResourceLimitsByPool)
slices.Sort(result)
return result
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList {
pool, exists := frt.pools[poolName]
if !exists {
return frt.zeroFloatingResources.DeepCopy()
func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList {
limits, ok := frt.floatingResourceLimitsByPool[poolName]
if !ok {
return internaltypes.ResourceList{}
}
return pool.totalResources.DeepCopy()
return limits
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList {
return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources)
func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity {
limits := frt.GetTotalAvailableForPool(poolName)
result := map[string]resource.Quantity{}
for _, res := range limits.GetResources() {
if res.Type != internaltypes.Floating {
continue
}
result[res.Name] = res.Value
}
return result
}

func (frt *FloatingResourceTypes) SummaryString() string {
if len(frt.zeroFloatingResources.Resources) == 0 {
if len(frt.floatingResourceLimitsByPool) == 0 {
return "none"
}
return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ")
poolSummaries := []string{}
for _, poolName := range frt.AllPools() {
poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName]))
}
return strings.Join(poolSummaries, " ")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) {
assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools())
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())
zero := resource.Quantity{}
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "gpu",
Quantity: resource.MustParse("300"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) {
func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
}, {
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu")
cpuPool := sut.GetTotalAvailableForPool("cpu")
assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2"))

gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu")
gpuPool := sut.GetTotalAvailableForPool("gpu")
assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2"))

notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value")
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2"))
notFound := sut.GetTotalAvailableForPool("some-invalid-value")
assert.True(t, notFound.IsEmpty())
}

func TestGetTotalAvailableForPoolAsMap(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI),
}, cpuPool)

gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI),
}, gpuPool)

notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value")
assert.Equal(t, map[string]resource.Quantity{}, notFound)
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions internal/scheduler/internaltypes/node_factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package internaltypes

import (
"fmt"

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -51,3 +53,22 @@ func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*No
f.resourceListFactory,
)
}

func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobjects.Executor, errorLogger func(string)) []*Node {
result := []*Node{}
for _, executor := range executors {
for _, node := range executor.GetNodes() {
if executor.Id != node.Executor {
errorLogger(fmt.Sprintf("Executor name mismatch: %q != %q", node.Executor, executor.Id))
continue
}
itNode, err := f.FromSchedulerObjectsNode(node)
if err != nil {
errorLogger(fmt.Sprintf("Invalid node %s: %v", node.Name, err))
continue
}
result = append(result, itNode)
}
}
return result
}
20 changes: 11 additions & 9 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type ResourceList struct {
}

type Resource struct {
Name string
Value int64
Scale k8sResource.Scale
Type ResourceType
Name string
RawValue int64
Value k8sResource.Quantity
Scale k8sResource.Scale
Type ResourceType
}

func (rl ResourceList) Equal(other ResourceList) bool {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q
return k8sResource.Quantity{}
}

return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index])
return *rl.asQuantity(index)
}

func (rl ResourceList) GetResources() []Resource {
Expand All @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource {
result := make([]Resource, len(rl.resources))
for i, q := range rl.resources {
result[i] = Resource{
Name: rl.factory.indexToName[i],
Value: q,
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
Name: rl.factory.indexToName[i],
RawValue: q,
Value: *rl.asQuantity(i),
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
}
}
return result
Expand Down
Loading

0 comments on commit c5ec732

Please sign in to comment.