Skip to content

Commit

Permalink
Merge branch 'master' into sendToGitHub/armadactl-consistent-flags
Browse files Browse the repository at this point in the history
  • Loading branch information
MustafaI authored Oct 11, 2024
2 parents 55e87ec + fae186a commit 21e4c5f
Show file tree
Hide file tree
Showing 68 changed files with 2,455 additions and 1,439 deletions.
6 changes: 6 additions & 0 deletions client/python/armada_client/internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""DO NOT USE - INTERNAL FACING ONLY - DO NOT USE
Internal modules are *NOT* meant for public consumption. External users of the
armada_client should not use or call any code contained in these modules as
they are unsupported and could change or break at any time.
"""
87 changes: 87 additions & 0 deletions client/python/armada_client/internal/binoculars_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Optional

import grpc

from armada_client.armada import (
binoculars_pb2,
binoculars_pb2_grpc,
)

from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1


def new_binoculars_client(url: str, disable_ssl: bool = False):
"""Constructs and returns a new BinocularsClient object.
:param url: A url specifying the gRPC binoculars endpoint in the format
"host:port".
:return: A new BinocularsClient object.
"""
parts = url.split(":")
if len(parts) != 2:
raise ValueError(f"Could not parse url provided: {url}")

host, port = parts[0], parts[1]
if disable_ssl:
channel = grpc.insecure_channel(f"{host}:{port}")
else:
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(
f"{host}:{port}",
channel_credentials,
)

client = BinocularsClient(channel)
return (channel, client)


class BinocularsClient:
"""
Client for accessing Armada's Binoculars service over gRPC.
:param channel: gRPC channel used for authentication. See
https://grpc.github.io/grpc/python/grpc.html
for more information.
:return: an Binoculars client instance
"""

def __init__(self, channel):
self.binoculars_stub = binoculars_pb2_grpc.BinocularsStub(channel)

def logs(
self,
job_id: str,
since_time: str,
pod_namespace: Optional[str] = "default",
pod_number: Optional[int] = 0,
log_options: Optional[core_v1.PodLogOptions] = core_v1.PodLogOptions(),
):
"""Retrieve logs for a specific Armada job.
:param job_id: The ID of the job for which to retreieve logs.
:param pod_namespace: The namespace of the pod/job.
:param since_time: If the empty string, retrieves all available logs.
Otherwise, retrieves logs emitted since given timestamp.
:param pod_number: The zero-indexed pod number from which to retrieve
logs. Defaults to zero.
:param log_options: An optional Kubernetes PodLogOptions object.
:return: A LogResponse object.
"""
log_request = binoculars_pb2.LogRequest(
job_id=job_id,
pod_number=pod_number,
pod_namespace=pod_namespace,
since_time=since_time,
log_options=log_options,
)
return self.binoculars_stub.Logs(log_request)

def cordon(self, node_name: str):
"""Send a cordon request for a specific node.
:param node_name: The name of the node.
:return: Empty grpc object.
"""
cordon_request = binoculars_pb2.CordonRequest(node_name=node_name)
return self.binoculars_stub.Cordon(cordon_request)
42 changes: 42 additions & 0 deletions client/python/armada_client/log_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from dataclasses import dataclass
from typing import Optional

from armada_client.internal.binoculars_client import new_binoculars_client


@dataclass
class LogLine:
"""Represents a single line from a log."""

line: str
timestamp: str


class JobLogClient:
"""
Client for retrieving logs for a given job.
:param url: The url to use for retreiving logs.
:param job_id: The ID of the job.
:return: A JobLogClient instance.
"""

def __init__(self, url: str, job_id: str, disable_ssl: bool = False):
self.job_id = job_id
self.url = url
self._channel, self._concrete_client = new_binoculars_client(
self.url, disable_ssl
)

def logs(self, since_time: Optional[str] = ""):
"""Retrieve logs for the job associated with this client.
:param since_time: Logs will be retrieved starting at the time
specified in this str. Must conform to RFC3339 date time format.
:return: A list of LogLine objects.
"""
return [
LogLine(line.line, line.timestamp)
for line in self._concrete_client.logs(self.job_id, since_time).log
]
9 changes: 8 additions & 1 deletion client/python/docs/source/python_armada_client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,11 @@ armada\_client.permissions module
---------------------------------

.. automodule:: armada_client.permissions
:members:
:members:


armada\_client.log_client module
---------------------------------------

.. automodule:: armada_client.log_client
:members:
26 changes: 26 additions & 0 deletions client/python/examples/binoculars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Example script utiltizing JobLogClient."""

import os

from armada_client.log_client import JobLogClient


DISABLE_SSL = os.environ.get("DISABLE_SSL", True)
HOST = os.environ.get("BINOCULARS_SERVER", "localhost")
PORT = os.environ.get("BINOCULARS_PORT", "50053")
JOB_ID = os.environ.get("JOB_ID")


def main():
"""Demonstrate basic use of JobLogClient."""
url = f"{HOST}:{PORT}"
client = JobLogClient(url, JOB_ID, DISABLE_SSL)

log_lines = client.logs()

for line in log_lines:
print(line.line)


if __name__ == "__main__":
main()
20 changes: 18 additions & 2 deletions client/python/tests/unit/server_mock.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from google.protobuf import empty_pb2

from armada_client.armada import (
submit_pb2_grpc,
submit_pb2,
binoculars_pb2,
binoculars_pb2_grpc,
event_pb2,
event_pb2_grpc,
health_pb2,
job_pb2_grpc,
job_pb2,
submit_pb2,
submit_pb2_grpc,
)
from armada_client.armada.job_pb2 import JobRunState
from armada_client.armada.submit_pb2 import JobState
Expand Down Expand Up @@ -149,3 +151,17 @@ def GetJobRunDetails(self, request, context):
for run in request.run_ids
}
)


class BinocularsService(binoculars_pb2_grpc.BinocularsServicer):
def Logs(self, request, context):
return binoculars_pb2.LogResponse(
log=[
binoculars_pb2.LogLine(timestamp="now", line="some log contents!"),
binoculars_pb2.LogLine(timestamp="now", line="some more log contents!"),
binoculars_pb2.LogLine(timestamp="now", line="even more log contents!"),
],
)

def Cordon(self, request, context):
return empty_pb2.Empty()
53 changes: 53 additions & 0 deletions client/python/tests/unit/test_log_clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from concurrent import futures

import grpc
import pytest

from google.protobuf import empty_pb2

from server_mock import BinocularsService

from armada_client.armada import binoculars_pb2_grpc
from armada_client.internal.binoculars_client import BinocularsClient
from armada_client.log_client import JobLogClient, LogLine


@pytest.fixture(scope="session", autouse=True)
def binoculars_server_mock():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
binoculars_pb2_grpc.add_BinocularsServicer_to_server(BinocularsService(), server)
server.add_insecure_port("[::]:4000")
server.start()

yield
server.stop(False)


channel = grpc.insecure_channel(target="127.0.0.1:4000")
tester = BinocularsClient(
grpc.insecure_channel(
target="127.0.0.1:4000",
options={
"grpc.keepalive_time_ms": 30000,
}.items(),
)
)


def test_logs():
resp = tester.logs("fake-job-id", "fake-namespace", "")
assert len(resp.log) == 3


def test_cordon():
result = tester.cordon("fake-node-name")
assert result == empty_pb2.Empty()


def test_job_log_client():
client = JobLogClient("127.0.0.1:4000", "fake-job-id", True)
log_lines = client.logs()
assert len(log_lines) == 3
for line in log_lines:
assert isinstance(line, LogLine)
assert len(line.line) > 0
2 changes: 2 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ grpc:
enabled: false
# You may want to configure indexedNodeLabels and indexedTaints to speed up scheduling.
scheduling:
pools:
- name: default
supportedResourceTypes:
- name: memory
resolution: "1"
Expand Down
56 changes: 56 additions & 0 deletions docs/python_armada_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,59 @@ Convert this Subject to a grpc Subject.
* **Return type**

armada.submit_pb2.Subject


## armada_client.log_client module


### _class_ armada_client.log_client.JobLogClient(url, job_id, disable_ssl=False)
Client for retrieving logs for a given job.


* **Parameters**


* **url** (*str*) – The url to use for retreiving logs.


* **job_id** (*str*) – The ID of the job.


* **disable_ssl** (*bool*) –



* **Returns**

A JobLogClient instance.



#### logs(since_time='')
Retrieve logs for the job associated with this client.


* **Parameters**

**since_time** (*str** | **None*) – Logs will be retrieved starting at the time
specified in this str. Must conform to RFC3339 date time format.



* **Returns**

A list of LogLine objects.



### _class_ armada_client.log_client.LogLine(line, timestamp)
Represents a single line from a log.


* **Parameters**


* **line** (*str*) –


* **timestamp** (*str*) –
5 changes: 5 additions & 0 deletions internal/executor/util/pod_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/server/configuration"
)

var managedPodSelector labels.Selector
Expand Down Expand Up @@ -109,6 +110,10 @@ func ExtractQueue(pod *v1.Pod) string {
return pod.Labels[domain.Queue]
}

func ExtractPool(pod *v1.Pod) string {
return pod.Annotations[configuration.PoolAnnotation]
}

func ExtractJobSet(pod *v1.Pod) string {
return pod.Annotations[domain.JobSetId]
}
Expand Down
9 changes: 9 additions & 0 deletions internal/executor/util/pod_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/server/configuration"
)

func TestIsInTerminalState_ShouldReturnTrueWhenPodInSucceededPhase(t *testing.T) {
Expand Down Expand Up @@ -369,6 +370,14 @@ func TestExtractQueue(t *testing.T) {
assert.Equal(t, ExtractQueue(podWithoutQueue), "")
}

func TestExtractPool(t *testing.T) {
podWithPool := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{configuration.PoolAnnotation: "pool-1"}}}
podWithoutPool := &v1.Pod{}

assert.Equal(t, ExtractPool(podWithPool), "pool-1")
assert.Equal(t, ExtractPool(podWithoutPool), "")
}

func TestExtractJobSet(t *testing.T) {
podWithJobSet := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{domain.JobSetId: "job-set-1"}}}
podWithoutJobSet := &v1.Pod{}
Expand Down
Loading

0 comments on commit 21e4c5f

Please sign in to comment.