Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Aug 14, 2023
1 parent b9c1e6a commit 5897206
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 285 deletions.
71 changes: 0 additions & 71 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,74 +182,3 @@ jobs:
- name: Dump Docker logs on failure
uses: jwalton/gh-docker-logs@v1
if: failure()

test_clustering:
name: Test C* Clustering (Elixir ${{ matrix.elixir }}, OTP ${{ matrix.otp }}, C* ${{ matrix.cassandra_version }}, Native protocol ${{ matrix.cassandra_native_protocol }})
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
include:
- otp: "24.2"
elixir: "1.13"
cassandra_version: "4.1"
cassandra_native_protocol: "v5"
- otp: "24.2"
elixir: "1.13"
cassandra_version: "4.1"
cassandra_native_protocol: "v4"
- otp: "24.2"
elixir: "1.13"
cassandra_version: "3"
cassandra_native_protocol: "v3"
- otp: "24.2"
elixir: "1.13"
cassandra_version: "3"
cassandra_native_protocol: "v4"
# Oldest supported Elixir/OTP matrix. We support down to Erlang 21, but
# rustler_precompiled (which we use for nimble_lz4, which we use for testing)
# requires OTP 23+.
- otp: "23.3"
elixir: "1.11"
cassandra_version: "3"
cassandra_native_protocol: "v3"
env:
CASSANDRA_VERSION: ${{ matrix.cassandra_version }}
CASSANDRA_NATIVE_PROTOCOL: ${{ matrix.cassandra_native_protocol }}

steps:
- name: Clone the repository
uses: actions/checkout@v2

- name: Start Docker and wait for it to be up
run: |
docker-compose up --detach --build
./test/docker/health-check-services.sh
- name: Install OTP and Elixir
uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Cache dependencies
id: cache-deps
uses: actions/cache@v2
with:
path: |
deps
_build
key: ${{ runner.os }}-otp${{ matrix.otp }}-elixir${{ matrix.elixir }}-${{ hashFiles('**/mix.lock') }}

- name: Install and compile dependencies
if: steps.cache-deps.outputs.cache-hit != 'true'
run: |
mix deps.get --only test
mix deps.compile
- name: Run clustering end-to-end tests
run: mix test.clustering

- name: Dump Docker logs on failure
uses: jwalton/gh-docker-logs@v1
if: failure()
143 changes: 123 additions & 20 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ defmodule Xandra.Cluster.ControlConnection do

use GenServer

alias Xandra.{Frame, Connection.Utils, Transport}
alias Xandra.{Frame, Connection.Utils, Simple, Transport}
alias Xandra.Cluster.{Host, StatusChange, TopologyChange}
alias Xandra.Cluster.ControlConnection.{ConnectedNode, Network}

require Logger

Expand Down Expand Up @@ -145,16 +144,14 @@ defmodule Xandra.Cluster.ControlConnection do

def handle_info(:refresh_topology, %__MODULE__{} = state) do
with :ok <- Transport.setopts(state.transport, active: false),
:ok <- Network.assert_no_transport_message(state.transport.socket),
:ok <- assert_no_transport_message(state.transport.socket),
{:ok, peers} <-
Network.fetch_cluster_topology(
state.transport.module,
fetch_cluster_topology(
state.transport,
state.autodiscovered_nodes_port,
%ConnectedNode{
ip: state.ip,
port: state.port,
protocol_module: state.protocol_module
}
state.protocol_module,
state.ip,
state.port
),
:ok <- Transport.setopts(state.transport, active: :once) do
state = refresh_topology(state, peers)
Expand Down Expand Up @@ -220,15 +217,12 @@ defmodule Xandra.Cluster.ControlConnection do
{:ok, {local_address, local_port}} <- Transport.address_and_port(transport),
state = %__MODULE__{state | ip: local_address, port: local_port},
{:ok, peers} <-
Network.fetch_cluster_topology(
transport.module,
fetch_cluster_topology(
transport,
state.autodiscovered_nodes_port,
%ConnectedNode{
ip: state.ip,
port: state.port,
protocol_module: state.protocol_module,
socket: state.transport.socket
}
state.protocol_module,
state.ip,
state.port
),
:ok <- register_to_events(state),
:ok <- Transport.setopts(state.transport, active: :once) do
Expand Down Expand Up @@ -281,8 +275,7 @@ defmodule Xandra.Cluster.ControlConnection do
protocol_format = Xandra.Protocol.frame_protocol_format(protocol_module)

with :ok <- Transport.send(state.transport, payload),
{:ok, %Frame{} = frame} <-
Network.recv_frame(state.transport.module, state.transport.socket, protocol_format) do
{:ok, %Frame{} = frame} <- recv_frame(state.transport, protocol_format) do
:ok = state.protocol_module.decode_response(frame)
else
{:error, reason} ->
Expand Down Expand Up @@ -351,6 +344,116 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

# https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useQuerySystemTableCluster.html
@select_peers_query """
SELECT * FROM system.peers
"""

@select_local_query """
SELECT data_center, host_id, rack, release_version, schema_version, tokens FROM system.local
"""

@doc """
Discover the peers in the same data center as the node we're connected to.
"""
@spec fetch_cluster_topology(
Transport.t(),
:inet.port_number(),
module(),
:inet.ip_address(),
:inet.port_number()
) ::
{:ok, [Host.t()]} | {:error, :closed | :inet.posix()}
def fetch_cluster_topology(
%Transport{} = transport,
autodiscovered_nodes_port,
protocol_module,
ip,
port
)
when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do
with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query),
{:ok, peers} <- query(transport, protocol_module, @select_peers_query) do
local_peer = %Host{
queried_peer_to_host(local_node_info)
| address: ip,
port: port
}

# We filter out the peers with null host_id because they seem to be nodes that are down or
# decommissioned but not removed from the cluster. See
# https://github.com/lexhide/xandra/pull/196 and
# https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes.
peers =
for peer_attrs <- peers,
peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port},
not is_nil(peer.host_id),
do: peer

{:ok, [local_peer | peers]}
end
end

defp query(%Transport{} = transport, protocol_module, statement)
when is_atom(protocol_module) and is_binary(statement) do
query = %Simple{statement: statement, values: [], default_consistency: :one}

payload =
Frame.new(:query, _options = [])
|> protocol_module.encode_request(query)
|> Frame.encode(protocol_module)

protocol_format = Xandra.Protocol.frame_protocol_format(protocol_module)

with :ok <- Transport.send(transport, payload),
{:ok, %Frame{} = frame} <- recv_frame(transport, protocol_format) do
{%Xandra.Page{} = page, _warnings} = protocol_module.decode_response(frame, query)
{:ok, Enum.to_list(page)}
end
end

defp assert_no_transport_message(socket) do
receive do
{kind, ^socket, _data} when kind in [:tcp, :ssl] -> {:error, :data}
{kind, ^socket, reason} when kind in [:tcp_error, :ssl_error] -> {:error, reason}
{kind, ^socket} when kind in [:tcp_closed, :ssl_closed] -> {:error, :closed}
after
0 -> :ok
end
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs)
end

defp queried_peer_to_host(%{} = peer_attrs) do
columns = [
"address",
"data_center",
"host_id",
"rack",
"release_version",
"schema_version",
"tokens"
]

peer_attrs =
peer_attrs
|> Map.take(columns)
|> Enum.map(fn {key, val} -> {String.to_existing_atom(key), val} end)

struct!(Host, peer_attrs)
end

defp recv_frame(%Transport{} = transport, protocol_format) when is_atom(protocol_format) do
case Utils.recv_frame(transport.module, transport.socket, protocol_format, _compressor = nil) do
{:ok, frame, ""} -> {:ok, frame}
{:error, reason} -> {:error, reason}
end
end

defp execute_telemetry(%__MODULE__{} = state, event_postfix, measurements, extra_meta) do
base_meta = %{
cluster_name: state.cluster_name,
Expand Down
37 changes: 0 additions & 37 deletions lib/xandra/cluster/control_connection/connected_node.ex

This file was deleted.

Loading

0 comments on commit 5897206

Please sign in to comment.