Skip to content

Commit

Permalink
Merge branch 'master' of github.com:armadaproject/armada into f/chrri…
Browse files Browse the repository at this point in the history
…sma/use-queues-service
  • Loading branch information
d80tb7 committed Sep 28, 2024
2 parents ade5f23 + 5910a4b commit 6500de2
Show file tree
Hide file tree
Showing 59 changed files with 1,148 additions and 4,135 deletions.
7 changes: 2 additions & 5 deletions internal/common/eventutil/eventutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,7 @@ func TestCompactSequences_Groups(t *testing.T) {
}

func TestSequenceEventListSizeBytes(t *testing.T) {
jobId, err := armadaevents.ProtoUuidFromUlidString(util.ULID().String())
if !assert.NoError(t, err) {
return
}
jobId := util.NewULID()

sequence := &armadaevents.EventSequence{
Queue: "",
Expand All @@ -272,7 +269,7 @@ func TestSequenceEventListSizeBytes(t *testing.T) {
{
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: jobId,
JobIdStr: jobId,
},
},
},
Expand Down
29 changes: 13 additions & 16 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/common/armadacontext"
Expand All @@ -18,15 +17,13 @@ import (
)

const (
jobIdString = "01f3j0g1md4qx7z5qb148qnh4r"
runIdString = "123e4567-e89b-12d3-a456-426614174000"
jobId = "01f3j0g1md4qx7z5qb148qnh4r"
runId = "123e4567-e89b-12d3-a456-426614174000"
batchSize = 3
batchDuration = 5 * time.Second
)

var (
jobIdProto, _ = armadaevents.ProtoUuidFromUlidString(jobIdString)
runIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString))
baseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z")
baseTimeProto = protoutil.ToTimestamp(baseTime)
testMetrics = metrics.NewMetrics("test")
Expand All @@ -41,8 +38,8 @@ var succeeded = &armadaevents.EventSequence{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runId,
JobIdStr: jobId,
},
},
},
Expand All @@ -58,8 +55,8 @@ var pendingAndRunning = &armadaevents.EventSequence{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunLeased{
JobRunLeased: &armadaevents.JobRunLeased{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runId,
JobIdStr: jobId,
ExecutorId: "k8sId1",
},
},
Expand All @@ -68,8 +65,8 @@ var pendingAndRunning = &armadaevents.EventSequence{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunRunning{
JobRunRunning: &armadaevents.JobRunRunning{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runId,
JobIdStr: jobId,
},
},
},
Expand All @@ -85,8 +82,8 @@ var failed = &armadaevents.EventSequence{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runId,
JobIdStr: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true,
Expand All @@ -100,7 +97,7 @@ var failed = &armadaevents.EventSequence{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobErrors{
JobErrors: &armadaevents.JobErrors{
JobId: jobIdProto,
JobIdStr: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true,
Expand Down Expand Up @@ -372,8 +369,8 @@ func generateEventSequence(numberOfEvents int) *armadaevents.EventSequence {
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runIdProto,
JobId: jobIdProto,
RunIdStr: runId,
JobIdStr: jobId,
},
},
})
Expand Down
15 changes: 7 additions & 8 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ const (
)

var (
PartitionMarkerGroupIdProto = armadaevents.ProtoUuidFromUuid(uuid.MustParse(PartitionMarkerGroupIdString))
PartitionMarkerGroupIdUuid = armadaevents.UuidFromProtoUuid(PartitionMarkerGroupIdProto)
PriorityClassName = "test-priority"
Groups = []string{"group1", "group2"}
NodeSelector = map[string]string{"foo": "bar"}
Affinity = &v1.Affinity{
PartitionMarkerGroupIdUuid = uuid.MustParse(PartitionMarkerGroupIdString)
PriorityClassName = "test-priority"
Groups = []string{"group1", "group2"}
NodeSelector = map[string]string{"foo": "bar"}
Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
Expand Down Expand Up @@ -317,8 +316,8 @@ var PartitionMarker = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_PartitionMarker{
PartitionMarker: &armadaevents.PartitionMarker{
GroupId: PartitionMarkerGroupIdProto,
Partition: PartitionMarkerPartitionId,
GroupIdStr: PartitionMarkerGroupIdString,
Partition: PartitionMarkerPartitionId,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/common/proto/protoutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
msg = &armadaevents.CancelJob{JobId: armadaevents.ProtoUuidFromUuid(uuid.New())}
msg = &armadaevents.CancelJob{JobIdStr: uuid.NewString()}
compressor = compress.NewThreadSafeZlibCompressor(1024)
decompressor = compress.NewThreadSafeZlibDecompressor()
marshalledMsg, _ = proto.Marshal(msg)
Expand Down
15 changes: 2 additions & 13 deletions internal/executor/job/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/armadaproject/armada/internal/executor/configuration"
util2 "github.com/armadaproject/armada/internal/executor/util"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/executorapi"
)

Expand All @@ -20,19 +19,9 @@ func CreateSubmitJobFromExecutorApiJobRunLease(
return nil, err
}

jobId, err := armadaevents.UlidStringFromProtoUuid(jobRunLease.Job.JobId)
if err != nil {
return nil, err
}

runId, err := armadaevents.UuidStringFromProtoUuid(jobRunLease.JobRunId)
if err != nil {
return nil, err
}

runMeta := &RunMeta{
JobId: jobId,
RunId: runId,
JobId: jobRunLease.Job.JobIdStr,
RunId: jobRunLease.JobRunIdStr,
JobSet: jobRunLease.Jobset,
Queue: jobRunLease.Queue,
}
Expand Down
80 changes: 26 additions & 54 deletions internal/executor/reporter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ func CreateEventForCurrentState(pod *v1.Pod, clusterId string) (*armadaevents.Ev
Created: now,
Event: &armadaevents.EventSequence_Event_JobRunAssigned{
JobRunAssigned: &armadaevents.JobRunAssigned{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
ObjectMeta: &armadaevents.ObjectMeta{
Expand All @@ -57,10 +55,8 @@ func CreateEventForCurrentState(pod *v1.Pod, clusterId string) (*armadaevents.Ev
Created: now,
Event: &armadaevents.EventSequence_Event_JobRunRunning{
JobRunRunning: &armadaevents.JobRunRunning{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
ObjectMeta: &armadaevents.ObjectMeta{
Expand Down Expand Up @@ -94,10 +90,8 @@ func CreateEventForCurrentState(pod *v1.Pod, clusterId string) (*armadaevents.Ev
Created: now,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
ResourceInfos: []*armadaevents.KubernetesResourceInfo{
{
ObjectMeta: &armadaevents.ObjectMeta{
Expand Down Expand Up @@ -170,10 +164,8 @@ func CreateJobIngressInfoEvent(pod *v1.Pod, clusterId string, associatedServices
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_StandaloneIngressInfo{
StandaloneIngressInfo: &armadaevents.StandaloneIngressInfo{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
ObjectMeta: &armadaevents.ObjectMeta{
KubernetesId: string(pod.ObjectMeta.UID),
Namespace: pod.Namespace,
Expand Down Expand Up @@ -201,10 +193,8 @@ func CreateSimpleJobPreemptedEvent(pod *v1.Pod) (*armadaevents.EventSequence, er
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_JobRunPreempted{
JobRunPreempted: &armadaevents.JobRunPreempted{
PreemptedJobId: preemptedJobId,
PreemptedJobIdStr: armadaevents.MustUlidStringFromProtoUuid(preemptedJobId),
PreemptedRunId: preemptedRunId,
PreemptedRunIdStr: armadaevents.MustUuidStringFromProtoUuid(preemptedRunId),
PreemptedJobIdStr: preemptedJobId,
PreemptedRunIdStr: preemptedRunId,
},
},
})
Expand All @@ -228,10 +218,8 @@ func CreateJobFailedEvent(pod *v1.Pod, reason string, cause armadaevents.Kuberne
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true,
Expand Down Expand Up @@ -259,29 +247,17 @@ func CreateJobFailedEvent(pod *v1.Pod, reason string, cause armadaevents.Kuberne
return sequence, nil
}

func CreateMinimalJobFailedEvent(jobIdStr string, runIdStr string, jobSet string, queue string, clusterId string, message string) (*armadaevents.EventSequence, error) {
func CreateMinimalJobFailedEvent(jobId string, runId string, jobSet string, queue string, clusterId string, message string) (*armadaevents.EventSequence, error) {
sequence := &armadaevents.EventSequence{}
sequence.Queue = queue
sequence.JobSetName = jobSet

jobId, err := armadaevents.ProtoUuidFromUlidString(jobIdStr)
if err != nil {
return nil, fmt.Errorf("failed to convert jobId %s to uuid - %s", jobIdStr, err)
}

runId, err := armadaevents.ProtoUuidFromUuidString(runIdStr)
if err != nil {
return nil, fmt.Errorf("failed to convert runId %s to uuid - %s", runIdStr, err)
}

sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
RunId: runId,
RunIdStr: runIdStr,
JobId: jobId,
JobIdStr: jobIdStr,
RunIdStr: runId,
JobIdStr: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true,
Expand Down Expand Up @@ -315,10 +291,8 @@ func CreateReturnLeaseEvent(pod *v1.Pod, reason string, debugMessage string, clu
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true, // EventMessage_LeaseReturned indicates a pod could not be scheduled.
Expand Down Expand Up @@ -355,10 +329,8 @@ func CreateJobUtilisationEvent(pod *v1.Pod, utilisationData *domain.UtilisationD
Created: types.TimestampNow(),
Event: &armadaevents.EventSequence_Event_ResourceUtilisation{
ResourceUtilisation: &armadaevents.ResourceUtilisation{
RunId: runId,
RunIdStr: armadaevents.MustUuidStringFromProtoUuid(runId),
JobId: jobId,
JobIdStr: armadaevents.MustUlidStringFromProtoUuid(jobId),
RunIdStr: runId,
JobIdStr: jobId,
ResourceInfo: &armadaevents.KubernetesResourceInfo{
ObjectMeta: &armadaevents.ObjectMeta{
KubernetesId: string(pod.ObjectMeta.UID),
Expand Down Expand Up @@ -388,15 +360,15 @@ func createEmptySequence(pod *v1.Pod) *armadaevents.EventSequence {
return sequence
}

func extractIds(pod *v1.Pod) (*armadaevents.Uuid, *armadaevents.Uuid, error) {
jobId, err := armadaevents.ProtoUuidFromUlidString(pod.Labels[domain.JobId])
if err != nil {
return nil, nil, fmt.Errorf("failed to convert jobId %s to uuid - %s", pod.Labels[domain.JobId], err)
func extractIds(pod *v1.Pod) (string, string, error) {
jobId, ok := pod.Labels[domain.JobId]
if !ok {
return "", "", fmt.Errorf("job Id not found on pod %s", pod.Name)
}

runId, err := armadaevents.ProtoUuidFromUuidString(pod.Labels[domain.JobRunId])
if err != nil {
return nil, nil, fmt.Errorf("failed to convert runId %s to uuid - %s", pod.Labels[domain.JobRunId], err)
runId, ok := pod.Labels[domain.JobRunId]
if !ok {
return "", "", fmt.Errorf("run Id not found on pod %s", pod.Name)
}

return jobId, runId, nil
Expand Down
Loading

0 comments on commit 6500de2

Please sign in to comment.