diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index 635a4bea261..5b0952245f3 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "armada_client" -version = "0.3.5" +version = "0.4.6" description = "Armada gRPC API python client" readme = "README.md" requires-python = ">=3.7" diff --git a/internal/scheduler/database/db.go b/internal/scheduler/database/db.go index 5b03f7b80d9..8187a2b0959 100644 --- a/internal/scheduler/database/db.go +++ b/internal/scheduler/database/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.22.0 +// sqlc v1.27.0 package database diff --git a/internal/scheduler/database/executor_repository.go b/internal/scheduler/database/executor_repository.go index 4b02c93a37d..433e21c7389 100644 --- a/internal/scheduler/database/executor_repository.go +++ b/internal/scheduler/database/executor_repository.go @@ -107,7 +107,7 @@ func (r *PostgresExecutorRepository) GetExecutorSettings(ctx *armadacontext.Cont executorSettings := make([]*schedulerobjects.ExecutorSettings, len(results)) for i, result := range results { settings := &schedulerobjects.ExecutorSettings{ - ExecutorId: result.ExecutorId, + ExecutorId: result.ExecutorID, Cordoned: result.Cordoned, CordonReason: result.CordonReason, SetByUser: result.SetByUser, diff --git a/internal/scheduler/database/migrations/017_amend_executor_settings_columns.sql b/internal/scheduler/database/migrations/017_amend_executor_settings_columns.sql new file mode 100644 index 00000000000..3c029d5d55f --- /dev/null +++ b/internal/scheduler/database/migrations/017_amend_executor_settings_columns.sql @@ -0,0 +1,14 @@ +ALTER TABLE executor_settings + ALTER COLUMN cordon_reason TYPE text USING (COALESCE(cordon_reason, '')) + , ALTER COLUMN cordon_reason SET DEFAULT '' + , ALTER COLUMN cordon_reason SET NOT NULL; + +ALTER TABLE executor_settings + ALTER COLUMN set_by_user TYPE text USING (COALESCE(set_by_user, '')) + , ALTER COLUMN set_by_user SET DEFAULT '' + , ALTER COLUMN set_by_user SET NOT NULL; + +ALTER TABLE executor_settings + ALTER COLUMN set_at_time TYPE timestamptz USING (COALESCE(set_at_time, '2024-11-12 09:00:00 UTC'::timestamptz)) + , ALTER COLUMN set_at_time SET DEFAULT NOW() + , ALTER COLUMN set_at_time SET NOT NULL; diff --git a/internal/scheduler/database/models.go b/internal/scheduler/database/models.go index 6e752700812..5b80e9facff 100644 --- a/internal/scheduler/database/models.go +++ b/internal/scheduler/database/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.22.0 +// sqlc v1.27.0 package database @@ -16,6 +16,14 @@ type Executor struct { LastUpdated time.Time `db:"last_updated"` } +type ExecutorSetting struct { + ExecutorID string `db:"executor_id"` + Cordoned bool `db:"cordoned"` + CordonReason string `db:"cordon_reason"` + SetByUser string `db:"set_by_user"` + SetAtTime time.Time `db:"set_at_time"` +} + type Job struct { JobID string `db:"job_id"` JobSet string `db:"job_set"` @@ -80,11 +88,3 @@ type Run struct { Queue string `db:"queue"` Pool string `db:"pool"` } - -type ExecutorSettings struct { - ExecutorId string `db:"executor_id"` - Cordoned bool `db:"cordoned"` - CordonReason string `db:"cordon_reason"` - SetByUser string `db:"set_by_user"` - SetAtTime time.Time `db:"set_at_time"` -} diff --git a/internal/scheduler/database/query.sql.go b/internal/scheduler/database/query.sql.go index 657371fe006..cfb1fb84a40 100644 --- a/internal/scheduler/database/query.sql.go +++ b/internal/scheduler/database/query.sql.go @@ -1,18 +1,15 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.22.0 +// sqlc v1.27.0 // source: query.sql package database import ( "context" - "fmt" "time" "github.com/google/uuid" - - "github.com/armadaproject/armada/pkg/controlplaneevents" ) const countGroup = `-- name: CountGroup :one @@ -26,6 +23,15 @@ func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, err return count, err } +const deleteExecutorSettings = `-- name: DeleteExecutorSettings :exec +DELETE FROM executor_settings WHERE executor_id = $1::text +` + +func (q *Queries) DeleteExecutorSettings(ctx context.Context, executorID string) error { + _, err := q.db.Exec(ctx, deleteExecutorSettings, executorID) + return err +} + const deleteOldMarkers = `-- name: DeleteOldMarkers :exec DELETE FROM markers WHERE created < $1::timestamptz ` @@ -94,7 +100,7 @@ func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []string) er } const markJobRunsPreemptRequestedByJobId = `-- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobRunsPreemptRequestedByJobIdParams struct { @@ -136,7 +142,7 @@ func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []string) } const markJobsCancelRequestedById = `-- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedByIdParams struct { @@ -151,7 +157,7 @@ func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, arg MarkJobsC } const markJobsCancelRequestedBySetAndQueuedState = `-- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedBySetAndQueuedStateParams struct { @@ -201,6 +207,36 @@ func (q *Queries) MarkRunsCancelledByJobId(ctx context.Context, jobIds []string) return err } +const selectAllExecutorSettings = `-- name: SelectAllExecutorSettings :many +SELECT executor_id, cordoned, cordon_reason, set_by_user, set_at_time FROM executor_settings +` + +func (q *Queries) SelectAllExecutorSettings(ctx context.Context) ([]ExecutorSetting, error) { + rows, err := q.db.Query(ctx, selectAllExecutorSettings) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ExecutorSetting + for rows.Next() { + var i ExecutorSetting + if err := rows.Scan( + &i.ExecutorID, + &i.Cordoned, + &i.CordonReason, + &i.SetByUser, + &i.SetAtTime, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const selectAllExecutors = `-- name: SelectAllExecutors :many SELECT executor_id, last_request, last_updated FROM executors ` @@ -350,57 +386,6 @@ func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecut return items, nil } -const selectJobsForExecutor = `-- name: SelectJobsForExecutor :many -SELECT jr.run_id, j.queue, j.job_set, j.user_id, j.groups, j.submit_message -FROM runs jr - JOIN jobs j - ON jr.job_id = j.job_id -WHERE jr.executor = $1 - AND jr.run_id NOT IN ($2::text[]) - AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false -` - -type SelectJobsForExecutorParams struct { - Executor string `db:"executor"` - RunIds []string `db:"run_ids"` -} - -type SelectJobsForExecutorRow struct { - RunID string `db:"run_id"` - Queue string `db:"queue"` - JobSet string `db:"job_set"` - UserID string `db:"user_id"` - Groups []byte `db:"groups"` - SubmitMessage []byte `db:"submit_message"` -} - -func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error) { - rows, err := q.db.Query(ctx, selectJobsForExecutor, arg.Executor, arg.RunIds) - if err != nil { - return nil, err - } - defer rows.Close() - var items []SelectJobsForExecutorRow - for rows.Next() { - var i SelectJobsForExecutorRow - if err := rows.Scan( - &i.RunID, - &i.Queue, - &i.JobSet, - &i.UserID, - &i.Groups, - &i.SubmitMessage, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const selectInitialJobs = `-- name: SelectInitialJobs :many SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 AND cancelled = 'false' AND succeeded = 'false' and failed = 'false' ORDER BY serial LIMIT $2 ` @@ -468,57 +453,6 @@ func (q *Queries) SelectInitialJobs(ctx context.Context, arg SelectInitialJobsPa return items, nil } -const selectNewJobs = `-- name: SelectNewJobs :many -SELECT job_id, job_set, queue, user_id, submitted, groups, priority, queued, queued_version, cancel_requested, cancelled, cancel_by_jobset_requested, succeeded, failed, submit_message, scheduling_info, scheduling_info_version, serial, last_modified, validated, pools FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2 -` - -type SelectNewJobsParams struct { - Serial int64 `db:"serial"` - Limit int32 `db:"limit"` -} - -func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error) { - rows, err := q.db.Query(ctx, selectNewJobs, arg.Serial, arg.Limit) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Job - for rows.Next() { - var i Job - if err := rows.Scan( - &i.JobID, - &i.JobSet, - &i.Queue, - &i.UserID, - &i.Submitted, - &i.Groups, - &i.Priority, - &i.Queued, - &i.QueuedVersion, - &i.CancelRequested, - &i.Cancelled, - &i.CancelByJobsetRequested, - &i.Succeeded, - &i.Failed, - &i.SubmitMessage, - &i.SchedulingInfo, - &i.SchedulingInfoVersion, - &i.Serial, - &i.LastModified, - &i.Validated, - &i.Pools, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const selectInitialRuns = `-- name: SelectInitialRuns :many SELECT run_id, job_id, created, job_set, executor, node, cancelled, running, succeeded, failed, returned, run_attempted, serial, last_modified, leased_timestamp, pending_timestamp, running_timestamp, terminated_timestamp, scheduled_at_priority, preempted, pending, preempted_timestamp, pod_requirements_overlay, preempt_requested, queue, pool FROM runs WHERE serial > $1 AND job_id = ANY($3::text[]) ORDER BY serial LIMIT $2 ` @@ -576,51 +510,52 @@ func (q *Queries) SelectInitialRuns(ctx context.Context, arg SelectInitialRunsPa return items, nil } -const selectNewRuns = `-- name: SelectNewRuns :many -SELECT run_id, job_id, created, job_set, executor, node, cancelled, running, succeeded, failed, returned, run_attempted, serial, last_modified, leased_timestamp, pending_timestamp, running_timestamp, terminated_timestamp, scheduled_at_priority, preempted, pending, preempted_timestamp, pod_requirements_overlay, preempt_requested, queue, pool FROM runs WHERE serial > $1 ORDER BY serial LIMIT $2 +const selectJobsByExecutorAndQueues = `-- name: SelectJobsByExecutorAndQueues :many +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE jr.executor = $1 + AND j.queue = ANY($2::text[]) + AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false AND jr.preempted = false ` -type SelectNewRunsParams struct { - Serial int64 `db:"serial"` - Limit int32 `db:"limit"` +type SelectJobsByExecutorAndQueuesParams struct { + Executor string `db:"executor"` + Queues []string `db:"queues"` } -func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error) { - rows, err := q.db.Query(ctx, selectNewRuns, arg.Serial, arg.Limit) +func (q *Queries) SelectJobsByExecutorAndQueues(ctx context.Context, arg SelectJobsByExecutorAndQueuesParams) ([]Job, error) { + rows, err := q.db.Query(ctx, selectJobsByExecutorAndQueues, arg.Executor, arg.Queues) if err != nil { return nil, err } defer rows.Close() - var items []Run + var items []Job for rows.Next() { - var i Run + var i Job if err := rows.Scan( - &i.RunID, &i.JobID, - &i.Created, &i.JobSet, - &i.Executor, - &i.Node, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, &i.Cancelled, - &i.Running, + &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.Returned, - &i.RunAttempted, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, &i.Serial, &i.LastModified, - &i.LeasedTimestamp, - &i.PendingTimestamp, - &i.RunningTimestamp, - &i.TerminatedTimestamp, - &i.ScheduledAtPriority, - &i.Preempted, - &i.Pending, - &i.PreemptedTimestamp, - &i.PodRequirementsOverlay, - &i.PreemptRequested, - &i.Queue, - &i.Pool, + &i.Validated, + &i.Pools, ); err != nil { return nil, err } @@ -632,51 +567,46 @@ func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([ return items, nil } -const selectNewRunsForJobs = `-- name: SelectNewRunsForJobs :many -SELECT run_id, job_id, created, job_set, executor, node, cancelled, running, succeeded, failed, returned, run_attempted, serial, last_modified, leased_timestamp, pending_timestamp, running_timestamp, terminated_timestamp, scheduled_at_priority, preempted, pending, preempted_timestamp, pod_requirements_overlay, preempt_requested, queue, pool FROM runs WHERE serial > $1 AND job_id = ANY($2::text[]) ORDER BY serial +const selectJobsForExecutor = `-- name: SelectJobsForExecutor :many +SELECT jr.run_id, j.queue, j.job_set, j.user_id, j.groups, j.submit_message +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE jr.executor = $1 + AND jr.run_id NOT IN ($2::text[]) + AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false ` -type SelectNewRunsForJobsParams struct { - Serial int64 `db:"serial"` - JobIds []string `db:"job_ids"` +type SelectJobsForExecutorParams struct { + Executor string `db:"executor"` + RunIds []string `db:"run_ids"` } -func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error) { - rows, err := q.db.Query(ctx, selectNewRunsForJobs, arg.Serial, arg.JobIds) +type SelectJobsForExecutorRow struct { + RunID string `db:"run_id"` + Queue string `db:"queue"` + JobSet string `db:"job_set"` + UserID string `db:"user_id"` + Groups []byte `db:"groups"` + SubmitMessage []byte `db:"submit_message"` +} + +func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error) { + rows, err := q.db.Query(ctx, selectJobsForExecutor, arg.Executor, arg.RunIds) if err != nil { return nil, err } defer rows.Close() - var items []Run + var items []SelectJobsForExecutorRow for rows.Next() { - var i Run + var i SelectJobsForExecutorRow if err := rows.Scan( &i.RunID, - &i.JobID, - &i.Created, - &i.JobSet, - &i.Executor, - &i.Node, - &i.Cancelled, - &i.Running, - &i.Succeeded, - &i.Failed, - &i.Returned, - &i.RunAttempted, - &i.Serial, - &i.LastModified, - &i.LeasedTimestamp, - &i.PendingTimestamp, - &i.RunningTimestamp, - &i.TerminatedTimestamp, - &i.ScheduledAtPriority, - &i.Preempted, - &i.Pending, - &i.PreemptedTimestamp, - &i.PodRequirementsOverlay, - &i.PreemptRequested, &i.Queue, - &i.Pool, + &i.JobSet, + &i.UserID, + &i.Groups, + &i.SubmitMessage, ); err != nil { return nil, err } @@ -688,20 +618,366 @@ func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsFor return items, nil } -const selectRunErrorsById = `-- name: SelectRunErrorsById :many -SELECT run_id, job_id, error FROM job_run_errors WHERE run_id = ANY($1::text[]) +const selectLatestJobRunSerial = `-- name: SelectLatestJobRunSerial :one +SELECT serial FROM runs ORDER BY serial DESC LIMIT 1 ` -// Run errors -func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []string) ([]JobRunError, error) { - rows, err := q.db.Query(ctx, selectRunErrorsById, runIds) - if err != nil { - return nil, err - } - defer rows.Close() - var items []JobRunError - for rows.Next() { - var i JobRunError +func (q *Queries) SelectLatestJobRunSerial(ctx context.Context) (int64, error) { + row := q.db.QueryRow(ctx, selectLatestJobRunSerial) + var serial int64 + err := row.Scan(&serial) + return serial, err +} + +const selectLatestJobSerial = `-- name: SelectLatestJobSerial :one +SELECT serial FROM jobs ORDER BY serial DESC LIMIT 1 +` + +func (q *Queries) SelectLatestJobSerial(ctx context.Context) (int64, error) { + row := q.db.QueryRow(ctx, selectLatestJobSerial) + var serial int64 + err := row.Scan(&serial) + return serial, err +} + +const selectLeasedJobsByQueue = `-- name: SelectLeasedJobsByQueue :many +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY($1::text[]) + AND jr.running = false + AND jr.pending = false + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false +` + +func (q *Queries) SelectLeasedJobsByQueue(ctx context.Context, queue []string) ([]Job, error) { + rows, err := q.db.Query(ctx, selectLeasedJobsByQueue, queue) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectNewJobs = `-- name: SelectNewJobs :many +SELECT job_id, job_set, queue, user_id, submitted, groups, priority, queued, queued_version, cancel_requested, cancelled, cancel_by_jobset_requested, succeeded, failed, submit_message, scheduling_info, scheduling_info_version, serial, last_modified, validated, pools FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2 +` + +type SelectNewJobsParams struct { + Serial int64 `db:"serial"` + Limit int32 `db:"limit"` +} + +func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error) { + rows, err := q.db.Query(ctx, selectNewJobs, arg.Serial, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectNewRuns = `-- name: SelectNewRuns :many +SELECT run_id, job_id, created, job_set, executor, node, cancelled, running, succeeded, failed, returned, run_attempted, serial, last_modified, leased_timestamp, pending_timestamp, running_timestamp, terminated_timestamp, scheduled_at_priority, preempted, pending, preempted_timestamp, pod_requirements_overlay, preempt_requested, queue, pool FROM runs WHERE serial > $1 ORDER BY serial LIMIT $2 +` + +type SelectNewRunsParams struct { + Serial int64 `db:"serial"` + Limit int32 `db:"limit"` +} + +func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error) { + rows, err := q.db.Query(ctx, selectNewRuns, arg.Serial, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Run + for rows.Next() { + var i Run + if err := rows.Scan( + &i.RunID, + &i.JobID, + &i.Created, + &i.JobSet, + &i.Executor, + &i.Node, + &i.Cancelled, + &i.Running, + &i.Succeeded, + &i.Failed, + &i.Returned, + &i.RunAttempted, + &i.Serial, + &i.LastModified, + &i.LeasedTimestamp, + &i.PendingTimestamp, + &i.RunningTimestamp, + &i.TerminatedTimestamp, + &i.ScheduledAtPriority, + &i.Preempted, + &i.Pending, + &i.PreemptedTimestamp, + &i.PodRequirementsOverlay, + &i.PreemptRequested, + &i.Queue, + &i.Pool, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectNewRunsForJobs = `-- name: SelectNewRunsForJobs :many +SELECT run_id, job_id, created, job_set, executor, node, cancelled, running, succeeded, failed, returned, run_attempted, serial, last_modified, leased_timestamp, pending_timestamp, running_timestamp, terminated_timestamp, scheduled_at_priority, preempted, pending, preempted_timestamp, pod_requirements_overlay, preempt_requested, queue, pool FROM runs WHERE serial > $1 AND job_id = ANY($2::text[]) ORDER BY serial +` + +type SelectNewRunsForJobsParams struct { + Serial int64 `db:"serial"` + JobIds []string `db:"job_ids"` +} + +func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error) { + rows, err := q.db.Query(ctx, selectNewRunsForJobs, arg.Serial, arg.JobIds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Run + for rows.Next() { + var i Run + if err := rows.Scan( + &i.RunID, + &i.JobID, + &i.Created, + &i.JobSet, + &i.Executor, + &i.Node, + &i.Cancelled, + &i.Running, + &i.Succeeded, + &i.Failed, + &i.Returned, + &i.RunAttempted, + &i.Serial, + &i.LastModified, + &i.LeasedTimestamp, + &i.PendingTimestamp, + &i.RunningTimestamp, + &i.TerminatedTimestamp, + &i.ScheduledAtPriority, + &i.Preempted, + &i.Pending, + &i.PreemptedTimestamp, + &i.PodRequirementsOverlay, + &i.PreemptRequested, + &i.Queue, + &i.Pool, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectPendingJobsByQueue = `-- name: SelectPendingJobsByQueue :many +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY($1::text[]) + AND jr.running = false + AND jr.pending = true + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false +` + +func (q *Queries) SelectPendingJobsByQueue(ctx context.Context, queue []string) ([]Job, error) { + rows, err := q.db.Query(ctx, selectPendingJobsByQueue, queue) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectQueuedJobsByQueue = `-- name: SelectQueuedJobsByQueue :many +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools +FROM jobs j +WHERE j.queue = ANY($1::text[]) + AND j.queued = true +` + +func (q *Queries) SelectQueuedJobsByQueue(ctx context.Context, queue []string) ([]Job, error) { + rows, err := q.db.Query(ctx, selectQueuedJobsByQueue, queue) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectRunErrorsById = `-- name: SelectRunErrorsById :many +SELECT run_id, job_id, error FROM job_run_errors WHERE run_id = ANY($1::text[]) +` + +// Run errors +func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []string) ([]JobRunError, error) { + rows, err := q.db.Query(ctx, selectRunErrorsById, runIds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []JobRunError + for rows.Next() { + var i JobRunError if err := rows.Scan(&i.RunID, &i.JobID, &i.Error); err != nil { return nil, err } @@ -713,6 +989,62 @@ func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []string) ([]J return items, nil } +const selectRunningJobsByQueue = `-- name: SelectRunningJobsByQueue :many +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY($1::text[]) + AND jr.running = true + AND jr.returned = false + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false +` + +func (q *Queries) SelectRunningJobsByQueue(ctx context.Context, queue []string) ([]Job, error) { + rows, err := q.db.Query(ctx, selectRunningJobsByQueue, queue) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Job + for rows.Next() { + var i Job + if err := rows.Scan( + &i.JobID, + &i.JobSet, + &i.Queue, + &i.UserID, + &i.Submitted, + &i.Groups, + &i.Priority, + &i.Queued, + &i.QueuedVersion, + &i.CancelRequested, + &i.Cancelled, + &i.CancelByJobsetRequested, + &i.Succeeded, + &i.Failed, + &i.SubmitMessage, + &i.SchedulingInfo, + &i.SchedulingInfoVersion, + &i.Serial, + &i.LastModified, + &i.Validated, + &i.Pools, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const selectUpdatedJobs = `-- name: SelectUpdatedJobs :many SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2 ` @@ -837,7 +1169,7 @@ func (q *Queries) SetTerminatedTime(ctx context.Context, arg SetTerminatedTimePa } const updateJobPriorityById = `-- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) +UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByIdParams struct { @@ -858,7 +1190,7 @@ func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriori } const updateJobPriorityByJobSet = `-- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByJobSetParams struct { @@ -891,8 +1223,14 @@ func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) const upsertExecutorSettings = `-- name: UpsertExecutorSettings :exec INSERT INTO executor_settings (executor_id, cordoned, cordon_reason, set_by_user, set_at_time) -VALUES($1::text, $2::boolean, $3::text, $4::text, $5::timestamptz) -ON CONFLICT (executor_id) DO UPDATE SET (cordoned, cordon_reason, set_by_user, set_at_time) = (excluded.cordoned, excluded.cordon_reason, excluded.set_by_user, excluded.set_at_time)` +VALUES ($1::text, $2::boolean, $3::text, $4::text, $5::timestamptz) +ON CONFLICT (executor_id) DO UPDATE + SET + cordoned = excluded.cordoned, + cordon_reason = excluded.cordon_reason, + set_by_user = excluded.set_by_user, + set_at_time = excluded.set_at_time +` type UpsertExecutorSettingsParams struct { ExecutorID string `db:"executor_id"` @@ -903,235 +1241,12 @@ type UpsertExecutorSettingsParams struct { } func (q *Queries) UpsertExecutorSettings(ctx context.Context, arg UpsertExecutorSettingsParams) error { - _, err := q.db.Exec(ctx, upsertExecutorSettings, arg.ExecutorID, arg.Cordoned, arg.CordonReason, arg.SetByUser, arg.SetAtTime) - return err -} - -const deleteExecutorSettings = `--name DeleteExecutorSettings :exec -DELETE FROM executor_settings WHERE executor_id = $1::text -` - -type DeleteExecutorSettingsParams struct { - ExecutorID string `db:"executor_id"` -} - -func (q *Queries) DeleteExecutorSettings(ctx context.Context, arg DeleteExecutorSettingsParams) error { - _, err := q.db.Exec(ctx, deleteExecutorSettings, arg.ExecutorID) + _, err := q.db.Exec(ctx, upsertExecutorSettings, + arg.ExecutorID, + arg.Cordoned, + arg.CordonReason, + arg.SetByUser, + arg.SetAtTime, + ) return err } - -const selectAllExecutorSettings = `-- name: SelectAllExecutorSettings :many -SELECT executor_id, cordoned, cordon_reason, set_by_user, set_at_time FROM executor_settings -` - -func (q *Queries) SelectAllExecutorSettings(ctx context.Context) ([]ExecutorSettings, error) { - rows, err := q.db.Query(ctx, selectAllExecutorSettings) - if err != nil { - return nil, err - } - defer rows.Close() - var items []ExecutorSettings - for rows.Next() { - var i ExecutorSettings - if err := rows.Scan(&i.ExecutorId, &i.Cordoned, &i.CordonReason, &i.SetByUser, &i.SetAtTime); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const selectLatestJobSerial = `-- name: SelectLatestJobSerial :one -SELECT serial FROM jobs ORDER BY serial DESC LIMIT 1 -` - -func (q *Queries) SelectLatestJobSerial(ctx context.Context) (int64, error) { - row := q.db.QueryRow(ctx, selectLatestJobSerial) - var serial int64 - err := row.Scan(&serial) - return serial, err -} - -const selectLatestJobRunSerial = `-- name: SelectLatestJobRunSerial :one -SELECT serial FROM runs ORDER BY serial DESC LIMIT 1 -` - -func (q *Queries) SelectLatestJobRunSerial(ctx context.Context) (int64, error) { - row := q.db.QueryRow(ctx, selectLatestJobRunSerial) - var serial int64 - err := row.Scan(&serial) - return serial, err -} - -const selectJobsByExecutorAndQueues = `-- name: SelectJobsByExecutorAndQueues :many -SELECT j.* -FROM runs jr - JOIN jobs j - ON jr.job_id = j.job_id -WHERE jr.executor = $1 - AND j.queue = ANY($2::text[]) - AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false AND jr.preempted = false -` - -func (q *Queries) SelectAllJobsByExecutorAndQueues(ctx context.Context, executor string, queues []string) ([]Job, error) { - rows, err := q.db.Query(ctx, selectJobsByExecutorAndQueues, executor, queues) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Job - for rows.Next() { - var i Job - if err := rows.Scan( - &i.JobID, - &i.JobSet, - &i.Queue, - &i.UserID, - &i.Submitted, - &i.Groups, - &i.Priority, - &i.Queued, - &i.QueuedVersion, - &i.CancelRequested, - &i.Cancelled, - &i.CancelByJobsetRequested, - &i.Succeeded, - &i.Failed, - &i.SubmitMessage, - &i.SchedulingInfo, - &i.SchedulingInfoVersion, - &i.Serial, - &i.LastModified, - &i.Validated, - &i.Pools, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const selectQueuedJobsByQueue = `-- name: selectQueuedJobsByQueue :many -SELECT j.* -FROM jobs j -WHERE j.queue = ANY($1::text[]) - AND j.queued = true -` - -// a job is in the Leased state if it has a run created for it, but isn't yet assigned by Kubernetes -const selectLeasedJobsByQueue = `-- name: selectLeasedJobsByQueue :many -SELECT j.* -FROM runs jr - JOIN jobs j - ON jr.job_id = j.job_id -WHERE j.queue = ANY($1::text[]) - AND jr.running = false - AND jr.pending = false - AND jr.succeeded = false - AND jr.failed = false - AND jr.cancelled = false - AND jr.preempted = false -` - -const selectPendingJobsByQueue = `-- name: selectPendingJobsByQueue :many -SELECT j.* -FROM runs jr - JOIN jobs j - ON jr.job_id = j.job_id -WHERE j.queue = ANY($1::text[]) - AND jr.running = false - AND jr.pending = true - AND jr.succeeded = false - AND jr.failed = false - AND jr.cancelled = false - AND jr.preempted = false -` - -const selectRunningJobsByQueue = `-- name: selectRunningJobsByQueue :many -SELECT j.* -FROM runs jr - JOIN jobs j - ON jr.job_id = j.job_id -WHERE j.queue = ANY($1::text[]) - AND jr.running = true - AND jr.returned = false - AND jr.succeeded = false - AND jr.failed = false - AND jr.cancelled = false - AND jr.preempted = false -` - -func (q *Queries) SelectAllJobsByQueueAndJobState(ctx context.Context, queue string, jobStates []controlplaneevents.ActiveJobState) ([]Job, error) { - items := []Job{} - for _, state := range jobStates { - var query string - switch state { - case controlplaneevents.ActiveJobState_QUEUED: - query = selectQueuedJobsByQueue - case controlplaneevents.ActiveJobState_LEASED: - query = selectLeasedJobsByQueue - case controlplaneevents.ActiveJobState_PENDING: - query = selectPendingJobsByQueue - case controlplaneevents.ActiveJobState_RUNNING: - query = selectRunningJobsByQueue - default: - return nil, fmt.Errorf("unknown active job state %+v", state) - } - jobs, err := q.selectAllJobsByQuery(ctx, query, queue) - if err != nil { - return nil, fmt.Errorf("unable to select jobs by queue and job state: %s", err) - } - items = append(items, jobs...) - } - return items, nil -} - -func (q *Queries) selectAllJobsByQuery(ctx context.Context, query string, args ...string) ([]Job, error) { - rows, err := q.db.Query(ctx, query, args) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Job - for rows.Next() { - var i Job - if err := rows.Scan( - &i.JobID, - &i.JobSet, - &i.Queue, - &i.UserID, - &i.Submitted, - &i.Groups, - &i.Priority, - &i.Queued, - &i.QueuedVersion, - &i.CancelRequested, - &i.Cancelled, - &i.CancelByJobsetRequested, - &i.Succeeded, - &i.Failed, - &i.SubmitMessage, - &i.SchedulingInfo, - &i.SchedulingInfoVersion, - &i.Serial, - &i.LastModified, - &i.Validated, - &i.Pools, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - diff --git a/internal/scheduler/database/query/query.sql b/internal/scheduler/database/query/query.sql index eb943fd5e61..ad4da1a55f2 100644 --- a/internal/scheduler/database/query/query.sql +++ b/internal/scheduler/database/query/query.sql @@ -11,16 +11,16 @@ SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, vali SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2; -- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3; +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]); +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsSucceededById :exec UPDATE jobs SET succeeded = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelledById :exec UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); @@ -29,7 +29,7 @@ UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); UPDATE jobs SET failed = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: SelectInitialRuns :many SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial LIMIT $2; @@ -44,7 +44,7 @@ SELECT run_id FROM runs; SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial; -- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobRunsSucceededById :exec UPDATE runs SET succeeded = true WHERE run_id = ANY(sqlc.arg(run_ids)::text[]); @@ -119,3 +119,79 @@ UPDATE runs SET running_timestamp = $1 WHERE run_id = $2; -- name: SetTerminatedTime :exec UPDATE runs SET terminated_timestamp = $1 WHERE run_id = $2; +-- name: UpsertExecutorSettings :exec +INSERT INTO executor_settings (executor_id, cordoned, cordon_reason, set_by_user, set_at_time) +VALUES (@executor_id::text, @cordoned::boolean, @cordon_reason::text, @set_by_user::text, @set_at_time::timestamptz) +ON CONFLICT (executor_id) DO UPDATE + SET + cordoned = excluded.cordoned, + cordon_reason = excluded.cordon_reason, + set_by_user = excluded.set_by_user, + set_at_time = excluded.set_at_time; + +-- name: DeleteExecutorSettings :exec +DELETE FROM executor_settings WHERE executor_id = @executor_id::text; + +-- name: SelectAllExecutorSettings :many +SELECT executor_id, cordoned, cordon_reason, set_by_user, set_at_time FROM executor_settings; + +-- name: SelectLatestJobSerial :one +SELECT serial FROM jobs ORDER BY serial DESC LIMIT 1; + +-- name: SelectLatestJobRunSerial :one +SELECT serial FROM runs ORDER BY serial DESC LIMIT 1; + +-- name: SelectJobsByExecutorAndQueues :many +SELECT j.* +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE jr.executor = @executor + AND j.queue = ANY(@queues::text[]) + AND jr.succeeded = false AND jr.failed = false AND jr.cancelled = false AND jr.preempted = false; + +-- name: SelectQueuedJobsByQueue :many +SELECT j.* +FROM jobs j +WHERE j.queue = ANY(@queue::text[]) + AND j.queued = true; + +-- name: SelectLeasedJobsByQueue :many +SELECT j.* +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY(@queue::text[]) + AND jr.running = false + AND jr.pending = false + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false; + +-- name: SelectPendingJobsByQueue :many +SELECT j.* +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY(@queue::text[]) + AND jr.running = false + AND jr.pending = true + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false; + +-- name: SelectRunningJobsByQueue :many +SELECT j.* +FROM runs jr + JOIN jobs j + ON jr.job_id = j.job_id +WHERE j.queue = ANY(@queue::text[]) + AND jr.running = true + AND jr.returned = false + AND jr.succeeded = false + AND jr.failed = false + AND jr.cancelled = false + AND jr.preempted = false; + diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go index 3b023e0c891..e56dea92553 100644 --- a/internal/scheduler/floatingresources/floating_resource_types.go +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -10,55 +10,55 @@ import ( "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type FloatingResourceTypes struct { - zeroFloatingResources schedulerobjects.ResourceList - pools map[string]*floatingResourcePool - rlFactory *internaltypes.ResourceListFactory + floatingResourceLimitsByPool map[string]internaltypes.ResourceList } -type floatingResourcePool struct { - totalResources schedulerobjects.ResourceList +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { + err := validate(config) + if err != nil { + return nil, err + } + + floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{} + for _, fr := range config { + for _, poolConfig := range fr.Pools { + floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add( + rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}), + ) + } + } + + return &FloatingResourceTypes{ + floatingResourceLimitsByPool: floatingResourceLimitsByPool, + }, nil } -func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { - zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} +func validate(config []configuration.FloatingResourceConfig) error { + floatingResourceNamesSeen := map[string]bool{} for _, c := range config { - if _, exists := zeroFloatingResources.Resources[c.Name]; exists { - return nil, fmt.Errorf("duplicate floating resource %s", c.Name) + if _, exists := floatingResourceNamesSeen[c.Name]; exists { + return fmt.Errorf("duplicate floating resource %s", c.Name) } - zeroFloatingResources.Resources[c.Name] = resource.Quantity{} + floatingResourceNamesSeen[c.Name] = true } - pools := map[string]*floatingResourcePool{} for _, fr := range config { + poolNamesSeen := map[string]bool{} for _, poolConfig := range fr.Pools { - pool, exists := pools[poolConfig.Name] - if !exists { - pool = &floatingResourcePool{ - totalResources: zeroFloatingResources.DeepCopy(), - } - pools[poolConfig.Name] = pool - } - existing := pool.totalResources.Resources[fr.Name] - if existing.Cmp(resource.Quantity{}) != 0 { - return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name) + if _, exists := poolNamesSeen[poolConfig.Name]; exists { + return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name) } - pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy() + poolNamesSeen[poolConfig.Name] = true } } - - return &FloatingResourceTypes{ - zeroFloatingResources: zeroFloatingResources, - pools: pools, - rlFactory: rlFactory, - }, nil + return nil } func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) { - available := frt.GetTotalAvailableForPoolInternalTypes(poolName) + available := frt.GetTotalAvailableForPool(poolName) if available.AllZero() { return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) } @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern } func (frt *FloatingResourceTypes) AllPools() []string { - result := maps.Keys(frt.pools) + result := maps.Keys(frt.floatingResourceLimitsByPool) slices.Sort(result) return result } -func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList { - pool, exists := frt.pools[poolName] - if !exists { - return frt.zeroFloatingResources.DeepCopy() +func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList { + limits, ok := frt.floatingResourceLimitsByPool[poolName] + if !ok { + return internaltypes.ResourceList{} } - return pool.totalResources.DeepCopy() + return limits } -func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList { - return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources) +func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity { + limits := frt.GetTotalAvailableForPool(poolName) + result := map[string]resource.Quantity{} + for _, res := range limits.GetResources() { + if res.Type != internaltypes.Floating { + continue + } + result[res.Name] = res.Value + } + return result } func (frt *FloatingResourceTypes) SummaryString() string { - if len(frt.zeroFloatingResources.Resources) == 0 { + if len(frt.floatingResourceLimitsByPool) == 0 { return "none" } - return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") + poolSummaries := []string{} + for _, poolName := range frt.AllPools() { + poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName])) + } + return strings.Join(poolSummaries, " ") } diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go index 76ce183b624..4bb5e82fb63 100644 --- a/internal/scheduler/floatingresources/floating_resource_types_test.go +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) { assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) } -func TestGetTotalAvailableForPool(t *testing.T) { - sut := makeSut(t, makeRlFactory()) - zero := resource.Quantity{} - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) +func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "gpu", + Quantity: resource.MustParse("300"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) } -func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) { +func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) +} + +func TestGetTotalAvailableForPool(t *testing.T) { sut := makeSut(t, makeRlFactory()) - cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu") + cpuPool := sut.GetTotalAvailableForPool("cpu") assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2")) - gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu") + gpuPool := sut.GetTotalAvailableForPool("gpu") assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2")) - notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value") - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1")) - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2")) + notFound := sut.GetTotalAvailableForPool("some-invalid-value") + assert.True(t, notFound.IsEmpty()) +} + +func TestGetTotalAvailableForPoolAsMap(t *testing.T) { + sut := makeSut(t, makeRlFactory()) + + cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI), + }, cpuPool) + + gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI), + }, gpuPool) + + notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value") + assert.Equal(t, map[string]resource.Quantity{}, notFound) } func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 8d4526a7b2b..148ede62909 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -1,6 +1,8 @@ package internaltypes import ( + "fmt" + "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -51,3 +53,22 @@ func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*No f.resourceListFactory, ) } + +func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobjects.Executor, errorLogger func(string)) []*Node { + result := []*Node{} + for _, executor := range executors { + for _, node := range executor.GetNodes() { + if executor.Id != node.Executor { + errorLogger(fmt.Sprintf("Executor name mismatch: %q != %q", node.Executor, executor.Id)) + continue + } + itNode, err := f.FromSchedulerObjectsNode(node) + if err != nil { + errorLogger(fmt.Sprintf("Invalid node %s: %v", node.Name, err)) + continue + } + result = append(result, itNode) + } + } + return result +} diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go index dda39551103..63772bbd8ea 100644 --- a/internal/scheduler/internaltypes/resource_list.go +++ b/internal/scheduler/internaltypes/resource_list.go @@ -24,10 +24,11 @@ type ResourceList struct { } type Resource struct { - Name string - Value int64 - Scale k8sResource.Scale - Type ResourceType + Name string + RawValue int64 + Value k8sResource.Quantity + Scale k8sResource.Scale + Type ResourceType } func (rl ResourceList) Equal(other ResourceList) bool { @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q return k8sResource.Quantity{} } - return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index]) + return *rl.asQuantity(index) } func (rl ResourceList) GetResources() []Resource { @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource { result := make([]Resource, len(rl.resources)) for i, q := range rl.resources { result[i] = Resource{ - Name: rl.factory.indexToName[i], - Value: q, - Scale: rl.factory.scales[i], - Type: rl.factory.types[i], + Name: rl.factory.indexToName[i], + RawValue: q, + Value: *rl.asQuantity(i), + Scale: rl.factory.scales[i], + Type: rl.factory.types[i], } } return result diff --git a/internal/scheduler/internaltypes/resource_list_test.go b/internal/scheduler/internaltypes/resource_list_test.go index 1138b6225b2..39fa9c6c1e2 100644 --- a/internal/scheduler/internaltypes/resource_list_test.go +++ b/internal/scheduler/internaltypes/resource_list_test.go @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) { a := testResourceList(factory, "1", "1Gi") expected := []Resource{ - {Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating}, - {Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating}, + {Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating}, + {Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating}, } + + for i, r := range expected { + expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale) + } + assert.Equal(t, expected, a.GetResources()) } diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index f7b9fc4226f..f9f295e8d05 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -503,7 +503,7 @@ func (job *Job) Tolerations() []v1.Toleration { // ResourceRequirements returns the resource requirements of the Job // KubernetesResourceRequirements below is preferred -func (job *Job) ResourceRequirements() v1.ResourceRequirements { +func (job *Job) resourceRequirements() v1.ResourceRequirements { if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } @@ -831,7 +831,7 @@ func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job job.NodeSelector(), job.Affinity(), job.Tolerations(), - job.ResourceRequirements().Requests, + job.resourceRequirements().Requests, job.PriorityClassName(), ) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 43d9bbe21b2..af9d932fe64 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -288,12 +288,32 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } txn := c.jobDb.ReadTxn() + + for _, executorSetting := range executorSettings { + if executorSetting.Cordoned { + // We may have settings for executors that don't exist in the repository. + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 1.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } else { + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 0.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } + } + for _, executor := range executors { - // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. - cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ - status: 0.0, - reason: "", - setByUser: "", + if _, statusExists := cordonedStatusByCluster[executor.Id]; !statusExists { + // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. + cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ + status: 0.0, + reason: "", + setByUser: "", + } } for _, node := range executor.Nodes { nodePool := node.GetPool() @@ -305,6 +325,10 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p nodeType: node.ReportingNodeType, } + if _, ok := schedulableNodeCountByCluster[clusterKey]; !ok { + schedulableNodeCountByCluster[clusterKey] = 0 + } + awayClusterKeys := make([]clusterMetricKey, 0, len(awayPools)) for _, ap := range awayPools { awayClusterKeys = append(awayClusterKeys, clusterMetricKey{ @@ -314,14 +338,20 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p }) } - if !node.Unschedulable { - addToResourceListMap(availableResourceByCluster, clusterKey, node.AvailableArmadaResource()) + nodeResources := node.AvailableArmadaResource() + + if !node.Unschedulable && cordonedStatusByCluster[executor.Id].status != 1.0 { schedulableNodeCountByCluster[clusterKey]++ + } else { + // We still want to publish these metrics, just with zeroed values + nodeResources.Zero() + } - // Add available resources to the away cluster pool - for _, awayClusterKey := range awayClusterKeys { - addToResourceListMap(availableResourceByCluster, awayClusterKey, node.AvailableArmadaResource()) - } + addToResourceListMap(availableResourceByCluster, clusterKey, nodeResources) + + // Add available resources to the away cluster pool + for _, awayClusterKey := range awayClusterKeys { + addToResourceListMap(availableResourceByCluster, awayClusterKey, nodeResources) } totalNodeCountByCluster[clusterKey]++ @@ -384,25 +414,8 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } } - for _, executorSetting := range executorSettings { - if executorSetting.Cordoned { - if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok { - cordonedValue.status = 1.0 - cordonedValue.reason = executorSetting.CordonReason - cordonedValue.setByUser = executorSetting.SetByUser - } else { - // We may have settings for executors that don't exist in the repository. - cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ - status: 1.0, - reason: executorSetting.CordonReason, - setByUser: executorSetting.SetByUser, - } - } - } - } - for _, pool := range c.floatingResourceTypes.AllPools() { - totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) + totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)} clusterKey := clusterMetricKey{ cluster: "floating", pool: pool, diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index b6fdfa22a91..c2f32ad60e7 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) for _, r := range s.EvictedResources.GetResources() { - m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value)) + m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) } } } diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 3e25deb892b..faa10d50901 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -209,7 +209,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio } queue := job.Queue() - requests := job.ResourceRequirements().Requests + requests := job.AllResourceRequirements() latestRun := job.LatestRun() pool := "" node := "" @@ -236,7 +236,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio // Resource Seconds for _, res := range m.trackedResourceNames { - resQty := requests[res] + resQty := requests.GetResourceByNameZeroIfMissing(string(res)) resSeconds := duration * float64(resQty.MilliValue()) / 1000 m.jobStateResourceSecondsByQueue. WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds) diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index d576713618f..772101890cc 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -308,9 +308,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { jobDbJobs: []*jobdb.Job{}, executors: []*schedulerobjects.Executor{executor}, expected: []prometheus.Metric{ - commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), - commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), - commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "memory", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), @@ -387,17 +387,19 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing node.StateByJobRunId[job.LatestRun().Id()] = schedulerobjects.JobRunState_RUNNING tests := map[string]struct { - poolConfig []configuration.PoolConfig - runningJobs []*jobdb.Job - nodes []*schedulerobjects.Node - expected []prometheus.Metric + poolConfig []configuration.PoolConfig + runningJobs []*jobdb.Job + nodes []*schedulerobjects.Node + executorSettings []*schedulerobjects.ExecutorSettings + expected []prometheus.Metric }{ "No away pools": { poolConfig: []configuration.PoolConfig{ {Name: testfixtures.TestPool}, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -413,8 +415,9 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing AwayPools: []string{testfixtures.TestPool}, }, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -422,6 +425,24 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing commonmetrics.NewClusterTotalCapacity(31, "cluster-1", testfixtures.TestPool2, "cpu", "type-1"), }, }, + "Cordoned cluster": { + poolConfig: []configuration.PoolConfig{ + {Name: testfixtures.TestPool}, + }, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{ + { + ExecutorId: "cluster-1", + Cordoned: true, + CordonReason: "bad executor", + }, + }, + expected: []prometheus.Metric{ + commonmetrics.NewClusterAvailableCapacity(0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -446,7 +467,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil) - executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.executorSettings, nil) collector := NewMetricsCollector( jobDb, diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 1f23597541b..3ace44245c4 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -12,7 +12,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/scheduler/adapters" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -132,8 +131,6 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := job.KubernetesResourceRequirements() - requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests)) - assert.Nil(t, err) jobId := job.Id() @@ -177,14 +174,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, boundNode.AllocatedByJobId, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, evictedNode.AllocatedByJobId, ), ) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index 2b96b335f1b..e92167c3275 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -218,10 +218,10 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche ) resourcesByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []*jobdb.Job) schedulerobjects.ResourceList { - rv := schedulerobjects.NewResourceListWithDefaultSize() + func(jobs []*jobdb.Job) internaltypes.ResourceList { + rv := internaltypes.ResourceList{} for _, job := range jobs { - rv.AddV1ResourceList(job.ResourceRequirements().Requests) + rv = rv.Add(job.AllResourceRequirements()) } return rv }, @@ -247,8 +247,8 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche maps.Keys(jobsByQueue), armadamaps.MapValues( resourcesByQueue, - func(rl schedulerobjects.ResourceList) string { - return rl.CompactString() + func(rl internaltypes.ResourceList) string { + return rl.String() }, ), jobCountPerQueue, diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index eb0a3bb05ad..f8fdbac4ce8 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) { for nodeId, node := range result.AffectedNodesById { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId) } } } @@ -1920,7 +1920,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Accounting across scheduling rounds. roundByJobId := make(map[string]int) indexByJobId := make(map[string]int) - allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) + allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) nodeIdByJobId := make(map[string]string) var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string @@ -1941,7 +1941,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) } - demandByQueue := map[string]schedulerobjects.ResourceList{} + demandByQueue := map[string]internaltypes.ResourceList{} // Run the scheduler. cordonedNodes := map[int]bool{} @@ -1978,12 +1978,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { queuedJobs = append(queuedJobs, job.WithQueued(true)) roundByJobId[job.Id()] = i indexByJobId[job.Id()] = j - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Add(job.AllResourceRequirements()) } } err = jobDbTxn.Upsert(queuedJobs) @@ -2005,12 +2000,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { delete(gangIdByJobId, job.Id()) delete(jobIdsByGangId[gangId], job.Id()) } - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements()) } } } @@ -2049,11 +2039,11 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources) + queueDemand := demandByQueue[queue] err := sctx.AddQueueSchedulingContext( queue, weight, - internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory), + allocatedByQueueAndPriorityClass[queue], queueDemand, queueDemand, limiterByQueue[queue], @@ -2092,28 +2082,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.SubV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Subtract(job.AllResourceRequirements()) } for _, jctx := range result.ScheduledJobs { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.AddV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Add(job.AllResourceRequirements()) } for queue, qctx := range sctx.QueueSchedulingContexts { - m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory) + m := allocatedByQueueAndPriorityClass[queue] assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass)) } @@ -2202,7 +2186,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { for node := it.NextNode(); node != nil; node = it.NextNode() { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId()) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId()) } } } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 01ad8e37773..0dad9c8061f 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule( ctx.Infof("Scheduling on pool %s with capacity %s %s", pool, fsctx.nodeDb.TotalKubernetesResources().String(), - l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(), + l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(), ) start := time.Now() @@ -237,21 +237,11 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } healthyExecutors = l.filterCordonedExecutors(ctx, healthyExecutors, executorSettings) } - nodes := []*internaltypes.Node{} - for _, executor := range healthyExecutors { - for _, node := range executor.Nodes { - if executor.Id != node.Executor { - ctx.Errorf("Executor name mismatch: %q != %q", node.Executor, executor.Id) - continue - } - itNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - ctx.Errorf("Invalid node %s: %v", node.Name, err) - continue - } - nodes = append(nodes, itNode) - } - } + + nodes := nodeFactory.FromSchedulerObjectsExecutors(healthyExecutors, func(errMes string) { + ctx.Error(errMes) + }) + homeJobs := jobSchedulingInfo.jobsByPool[pool.Name] awayJobs := []*jobdb.Job{} @@ -277,7 +267,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) schedulingContext, err := l.constructSchedulingContext( pool.Name, @@ -528,7 +518,7 @@ func (l *FairSchedulingAlgo) SchedulePool( pool string, ) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { totalResources := fsctx.nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool)) constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues)) diff --git a/internal/scheduler/simulator/sink/job_writer.go b/internal/scheduler/simulator/sink/job_writer.go index ccac68dcace..7f4d029d365 100644 --- a/internal/scheduler/simulator/sink/job_writer.go +++ b/internal/scheduler/simulator/sink/job_writer.go @@ -4,7 +4,6 @@ import ( "os" parquetWriter "github.com/xitongsys/parquet-go/writer" - v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" protoutil "github.com/armadaproject/armada/internal/common/proto" @@ -77,10 +76,10 @@ func (j *JobWriter) createJobRunRow(st *model.StateTransition) ([]*JobRunRow, er associatedJob := jobsList[i] if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + cpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("cpu") + memoryLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("memory") + ephemeralStorageLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("ephemeral-storage") + gpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("nvidia.com/gpu") eventTime := protoutil.ToStdTime(event.Created) rows = append(rows, &JobRunRow{ diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index c94937e02bb..393409632df 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -92,13 +92,22 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { panic(err) } + nodeFactory := internaltypes.NewNodeFactory( + srv.schedulingConfig.IndexedTaints, + srv.schedulingConfig.IndexedNodeLabels, + srv.resourceListFactory) + executorsByPoolAndId := map[string]map[string]*executor{} for _, ex := range executors { - nodes := ex.GetNodes() - nodesByPool := armadaslices.GroupByFunc(nodes, func(n *schedulerobjects.Node) string { + nodes := nodeFactory.FromSchedulerObjectsExecutors( + []*schedulerobjects.Executor{ex}, + func(s string) { ctx.Error(s) }) + + nodesByPool := armadaslices.GroupByFunc(nodes, func(n *internaltypes.Node) string { return n.GetPool() }) for pool, nodes := range nodesByPool { + nodeDb, err := srv.constructNodeDb(nodes) if _, present := executorsByPoolAndId[pool]; !present { @@ -115,7 +124,6 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { WithStacktrace(ctx, err). Warnf("Error constructing nodedb for executor: %s", ex.Id) } - } } srv.state.Store(&schedulerState{ @@ -264,11 +272,7 @@ poolStart: return schedulingResult{isSchedulable: false, reason: sb.String()} } -func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { - nodeFactory := internaltypes.NewNodeFactory(srv.schedulingConfig.IndexedTaints, - srv.schedulingConfig.IndexedNodeLabels, - srv.resourceListFactory) - +func (srv *SubmitChecker) constructNodeDb(nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { nodeDb, err := nodedb.NewNodeDb( srv.schedulingConfig.PriorityClasses, srv.schedulingConfig.IndexedResources, @@ -284,11 +288,7 @@ func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*node txn := nodeDb.Txn(true) defer txn.Abort() for _, node := range nodes { - dbNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - return nil, err - } - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode); err != nil { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { return nil, err } } diff --git a/internal/scheduler/submitcheck_test.go b/internal/scheduler/submitcheck_test.go index 4a302b41a93..526ce6f2266 100644 --- a/internal/scheduler/submitcheck_test.go +++ b/internal/scheduler/submitcheck_test.go @@ -257,8 +257,12 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) { } func Executor(nodes ...*schedulerobjects.Node) *schedulerobjects.Executor { + executorId := uuid.NewString() + for _, node := range nodes { + node.Executor = executorId + } return &schedulerobjects.Executor{ - Id: uuid.NewString(), + Id: executorId, Pool: "cpu", Nodes: nodes, } diff --git a/internal/scheduleringester/schedulerdb.go b/internal/scheduleringester/schedulerdb.go index a2d36e9f2be..01929560929 100644 --- a/internal/scheduleringester/schedulerdb.go +++ b/internal/scheduleringester/schedulerdb.go @@ -373,9 +373,7 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper return nil case DeleteExecutorSettings: for _, settingsUpsert := range o { - err := queries.DeleteExecutorSettings(ctx, schedulerdb.DeleteExecutorSettingsParams{ - ExecutorID: settingsUpsert.ExecutorID, - }) + err := queries.DeleteExecutorSettings(ctx, settingsUpsert.ExecutorID) if err != nil { return errors.Wrapf(err, "error deleting executor settings for %s", settingsUpsert.ExecutorID) } @@ -383,7 +381,10 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper return nil case CancelExecutor: for executor, cancelRequest := range o { - jobs, err := queries.SelectAllJobsByExecutorAndQueues(ctx, executor, cancelRequest.Queues) + jobs, err := queries.SelectJobsByExecutorAndQueues(ctx, schedulerdb.SelectJobsByExecutorAndQueuesParams{ + Executor: executor, + Queues: cancelRequest.Queues, + }) if err != nil { return errors.Wrapf(err, "error cancelling jobs on executor %s by queue and priority class", executor) } @@ -407,7 +408,10 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper } case PreemptExecutor: for executor, preemptRequest := range o { - jobs, err := queries.SelectAllJobsByExecutorAndQueues(ctx, executor, preemptRequest.Queues) + jobs, err := queries.SelectJobsByExecutorAndQueues(ctx, schedulerdb.SelectJobsByExecutorAndQueuesParams{ + Executor: executor, + Queues: preemptRequest.Queues, + }) if err != nil { return errors.Wrapf(err, "error preempting jobs on executor %s by queue and priority class", executor) } @@ -431,7 +435,7 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper } case CancelQueue: for _, cancelRequest := range o { - jobs, err := queries.SelectAllJobsByQueueAndJobState(ctx, cancelRequest.Name, cancelRequest.JobStates) + jobs, err := s.selectAllJobsByQueueAndJobState(ctx, queries, cancelRequest.Name, cancelRequest.JobStates) if err != nil { return errors.Wrapf(err, "error cancelling jobs by queue, job state and priority class") } @@ -457,7 +461,7 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper for _, preemptRequest := range o { // Only allocated jobs can be preempted jobStates := []controlplaneevents.ActiveJobState{controlplaneevents.ActiveJobState_LEASED, controlplaneevents.ActiveJobState_PENDING, controlplaneevents.ActiveJobState_RUNNING} - jobs, err := queries.SelectAllJobsByQueueAndJobState(ctx, preemptRequest.Name, jobStates) + jobs, err := s.selectAllJobsByQueueAndJobState(ctx, queries, preemptRequest.Name, jobStates) if err != nil { return errors.Wrapf(err, "error preempting jobs by queue, job state and priority class") } @@ -485,6 +489,32 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper return nil } +func (s *SchedulerDb) selectAllJobsByQueueAndJobState(ctx *armadacontext.Context, queries *schedulerdb.Queries, queue string, jobStates []controlplaneevents.ActiveJobState) ([]schedulerdb.Job, error) { + items := []schedulerdb.Job{} + for _, state := range jobStates { + var jobs []schedulerdb.Job + var err error + switch state { + case controlplaneevents.ActiveJobState_QUEUED: + jobs, err = queries.SelectQueuedJobsByQueue(ctx, []string{queue}) + case controlplaneevents.ActiveJobState_LEASED: + jobs, err = queries.SelectLeasedJobsByQueue(ctx, []string{queue}) + case controlplaneevents.ActiveJobState_PENDING: + jobs, err = queries.SelectPendingJobsByQueue(ctx, []string{queue}) + case controlplaneevents.ActiveJobState_RUNNING: + jobs, err = queries.SelectRunningJobsByQueue(ctx, []string{queue}) + default: + return nil, fmt.Errorf("unknown active job state %+v", state) + } + + if err != nil { + return nil, fmt.Errorf("unable to select jobs by queue and job state: %s", err) + } + items = append(items, jobs...) + } + return items, nil +} + // createMarkJobCancelRequestedById returns []*schedulerdb.MarkJobsCancelRequestedByIdParams for the specified jobs such // that no two MarkJobsCancelRequestedByIdParams are for the same queue and jobset func createMarkJobsCancelRequestedByIdParams(jobs []schedulerdb.Job) []*schedulerdb.MarkJobsCancelRequestedByIdParams { diff --git a/internal/scheduleringester/schedulerdb_test.go b/internal/scheduleringester/schedulerdb_test.go index 671a0b92b40..95b9d70a3b4 100644 --- a/internal/scheduleringester/schedulerdb_test.go +++ b/internal/scheduleringester/schedulerdb_test.go @@ -939,15 +939,15 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] if err != nil { return errors.WithStack(err) } - filtered := armadaslices.Filter(allSettings, func(e schedulerdb.ExecutorSettings) bool { - _, ok := expected[e.ExecutorId] + filtered := armadaslices.Filter(allSettings, func(e schedulerdb.ExecutorSetting) bool { + _, ok := expected[e.ExecutorID] return ok }) actual := UpsertExecutorSettings{} for _, a := range filtered { - actual[a.ExecutorId] = &ExecutorSettingsUpsert{ - ExecutorID: a.ExecutorId, + actual[a.ExecutorID] = &ExecutorSettingsUpsert{ + ExecutorID: a.ExecutorID, Cordoned: a.Cordoned, CordonReason: a.CordonReason, } @@ -958,8 +958,8 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] if err != nil { return errors.WithStack(err) } - filtered := armadaslices.Filter(allSettings, func(e schedulerdb.ExecutorSettings) bool { - _, ok := expected[e.ExecutorId] + filtered := armadaslices.Filter(allSettings, func(e schedulerdb.ExecutorSetting) bool { + _, ok := expected[e.ExecutorID] return ok }) assert.Equal(t, 0, len(filtered))