Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Aug 14, 2023
1 parent 2975342 commit b9c1e6a
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 762 deletions.
1 change: 0 additions & 1 deletion lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ defmodule Xandra do
],

# Internal options, used by Xandra.Cluster.
registry: [doc: false, type: :atom],
cluster_pid: [doc: false, type: :pid]
]

Expand Down
14 changes: 11 additions & 3 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ defmodule Xandra.Cluster do
"""

alias Xandra.{Batch, ConnectionError, Prepared, RetryStrategy}
alias Xandra.Cluster.{ControlConnection, Pool}
alias Xandra.Cluster.{ControlConnection, Host, Pool}

@typedoc """
A Xandra cluster.
Expand Down Expand Up @@ -308,8 +308,7 @@ defmodule Xandra.Cluster do
# Internal for testing, not exposed.
xandra_module: [type: :atom, default: Xandra, doc: false],
control_connection_module: [type: :atom, default: ControlConnection, doc: false],
test_discovered_hosts: [type: :any, default: [], doc: false],
registry_listeners: [type: :any, default: [], doc: false]
test_discovered_hosts: [type: :any, default: [], doc: false]
]

@start_link_opts_schema_keys Keyword.keys(@start_link_opts_schema)
Expand Down Expand Up @@ -546,6 +545,15 @@ defmodule Xandra.Cluster do
Pool.stop(cluster, reason, timeout)
end

@doc """
Returns a list of hosts that the cluster has outgoing connections to.
"""
@doc since: "0.18.0"
@spec connected_hosts(cluster) :: [Host.t()]
def connected_hosts(cluster) do
Pool.connected_hosts(cluster)
end

defp with_conn_and_retrying(cluster, options, fun) do
RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end)
end
Expand Down
76 changes: 52 additions & 24 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,36 @@ defmodule Xandra.Cluster.Pool do

alias_or_nil = if sync_connect_timeout, do: alias()

case :gen_statem.start_link(
__MODULE__,
{cluster_opts, pool_opts, alias_or_nil},
genstatem_opts
) do
start_arg = {cluster_opts, pool_opts, alias_or_nil}

result =
case Keyword.fetch(cluster_opts, :name) do
:error ->
:gen_statem.start_link(__MODULE__, start_arg, genstatem_opts)

{:ok, atom} when is_atom(atom) ->
:gen_statem.start_link({:local, atom}, __MODULE__, start_arg, genstatem_opts)

{:ok, {:global, _term} = tuple} ->
:gen_statem.start_link(tuple, __MODULE__, start_arg, genstatem_opts)

{:ok, {:via, via_module, _term} = tuple} when is_atom(via_module) ->
:gen_statem.start_link(tuple, __MODULE__, start_arg, genstatem_opts)

{:ok, other} ->
raise ArgumentError, """
expected :name option to be one of the following:
* nil
* atom
* {:global, term}
* {:via, module, term}
Got: #{inspect(other)}
"""
end

case result do
{:ok, pid} when sync_connect_timeout == false ->
{:ok, pid}

Expand Down Expand Up @@ -62,6 +87,11 @@ defmodule Xandra.Cluster.Pool do
:gen_statem.call(pid, :checkout)
end

@spec connected_hosts(:gen_statem.server_ref()) :: [Host.t()]
def connected_hosts(pid) do
:gen_statem.call(pid, :connected_hosts)
end

## Data

defstruct [
Expand Down Expand Up @@ -90,9 +120,6 @@ defmodule Xandra.Cluster.Pool do
:load_balancing_module,
:load_balancing_state,

# The registry where connections are registered.
:registry,

# The number of target pools.
:target_pools,

Expand Down Expand Up @@ -129,16 +156,6 @@ defmodule Xandra.Cluster.Pool do
def init({cluster_opts, pool_opts, sync_connect_alias_or_nil}) do
Process.flag(:trap_exit, true)

registry_name =
Module.concat([Xandra.ClusterRegistry, to_string(System.unique_integer([:positive]))])

{:ok, _} =
Registry.start_link(
keys: :unique,
name: registry_name,
listeners: Keyword.fetch!(cluster_opts, :registry_listeners)
)

# Start supervisor for the pools.
{:ok, pool_sup} = Supervisor.start_link([], strategy: :one_for_one)

Expand All @@ -160,7 +177,6 @@ defmodule Xandra.Cluster.Pool do
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
sync_connect_alias: sync_connect_alias_or_nil,
registry: registry_name,
name: Keyword.get(cluster_opts, :name),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval)
Expand Down Expand Up @@ -200,6 +216,16 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data, {:reply, from, reply}}
end

def handle_event({:call, from}, :connected_hosts, @state, %__MODULE__{} = data) do
connected_hosts =
for %{status: :connected, pool_pid: pool_pid, host: host} <- Map.values(data.peers),
is_pid(pool_pid) do
host
end

{:keep_state_and_data, {:reply, from, connected_hosts}}
end

def handle_event(:into, {:host_up, %Host{} = host}, @state, %__MODULE__{} = data) do
data = update_in(data.load_balancing_state, &data.load_balancing_module.host_up(&1, host))

Expand Down Expand Up @@ -288,6 +314,12 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

def handle_event(:info, {:EXIT, _pid, reason}, @state, %__MODULE__{} = _data)
when reason in [:normal, :shutdown] or
(is_tuple(reason) and tuple_size(reason) == 2 and elem(reason, 0) == :shutdown) do
:keep_state_and_data
end

def handle_event({:timeout, :reconnect_control_connection}, nil, @state, %__MODULE__{} = data) do
{:keep_state, data, {:next_event, :internal, :start_control_connection}}
end
Expand Down Expand Up @@ -327,11 +359,7 @@ defmodule Xandra.Cluster.Pool do
# peer, and it'll only start it once.
defp start_pool(%__MODULE__{} = data, %Host{} = host) do
conn_options =
Keyword.merge(data.pool_options,
nodes: [Host.format_address(host)],
registry: data.registry,
cluster_pid: self()
)
Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self())

peername = Host.to_peername(host)

Expand Down
22 changes: 0 additions & 22 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ defmodule Xandra.Connection do
:address,
:port,
:connection_name,
:registry,
:cluster_pid,
:pool_index,
:peername
Expand Down Expand Up @@ -58,7 +57,6 @@ defmodule Xandra.Connection do
address: address,
port: port,
connection_name: connection_name,
registry: Keyword.get(options, :registry),
cluster_pid: cluster_pid,
pool_index: Keyword.fetch!(options, :pool_index),
peername: peername
Expand All @@ -84,8 +82,6 @@ defmodule Xandra.Connection do
supported_options: supported_options
})

maybe_register_or_put_value(state, :up)

if cluster_pid do
send(cluster_pid, {:xandra, :connected, peername, self()})
end
Expand Down Expand Up @@ -389,7 +385,6 @@ defmodule Xandra.Connection do
send(state.cluster_pid, {:xandra, :disconnected, {state.address, state.port}, self()})
end

maybe_register_or_put_value(state, :down)
:ok = transport.close(socket)
end

Expand Down Expand Up @@ -474,23 +469,6 @@ defmodule Xandra.Connection do
end
end

defp maybe_register_or_put_value(%__MODULE__{registry: nil}, _value) do
:ok
end

defp maybe_register_or_put_value(%__MODULE__{peername: {address, port}} = state, value)
when is_tuple(address) do
key = {{address, port}, state.pool_index}

case Registry.register(state.registry, key, value) do
{:ok, _owner} ->
:ok

{:error, {:already_registered, _}} ->
{_new, _old} = Registry.update_value(state.registry, key, fn _ -> value end)
end
end

defp get_right_compressor(%__MODULE__{} = state, provided) do
case Xandra.Protocol.frame_protocol_format(state.protocol_module) do
:v5_or_more -> assert_valid_compressor(state.compressor, provided) || state.compressor
Expand Down
20 changes: 10 additions & 10 deletions lib/xandra/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,7 @@ defmodule Xandra.Telemetry do
logger_meta =
case Map.fetch(metadata, :host) do
{:ok, %Host{address: address, port: port}} ->
address =
case address do
ip when is_tuple(ip) -> ip |> :inet.ntoa() |> to_string()
str when is_list(str) -> to_string(str)
end

[xandra_address: address, xandra_port: port]
[xandra_address: address_to_string(address), xandra_port: port]

:error ->
[]
Expand Down Expand Up @@ -314,7 +308,7 @@ defmodule Xandra.Telemetry do

def handle_event([:xandra | event], measurements, metadata, :no_config) do
%{address: address, port: port} = metadata
logger_meta = [xandra_address: address, xandra_port: port]
logger_meta = [xandra_address: address_to_string(address), xandra_port: port]

case event do
[:connected] ->
Expand Down Expand Up @@ -368,15 +362,21 @@ defmodule Xandra.Telemetry do
Logger.debug(
"Could not use protocol #{inspect(metadata.failed_version)}, " <>
"downgrading to #{inspect(metadata.new_version)}",
xandra_address: metadata.address,
xandra_address: address_to_string(metadata.address),
xandra_port: metadata.port
)
end

def handle_debug_event([:xandra, :connected], _measurements, metadata, :no_config) do
logger_meta = [xandra_address: metadata.address, xandra_port: metadata.port]
logger_meta = [
xandra_address: address_to_string(metadata.address),
xandra_port: metadata.port
]

Logger.debug("Connected using protocol #{inspect(metadata.protocol_module)}", logger_meta)
Logger.debug("Supported options: #{inspect(metadata.supported_options)}", logger_meta)
end

defp address_to_string(ip) when is_tuple(ip), do: ip |> :inet.ntoa() |> to_string()
defp address_to_string(other), do: to_string(other)
end
59 changes: 19 additions & 40 deletions test/xandra/cluster/ccm_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ defmodule Xandra.Cluster.CCMTest do

import Xandra.TestHelper, only: [cmd!: 2, wait_for_passing: 2]

alias Xandra.Cluster.Host

@moduletag :integration
@moduletag :ccm

@cluster_name "xandra_test_cluster"
@cassandra_version "4.1.3"
@node_count 3

setup_all do
on_exit(fn ->
ccm("stop")
end)
end

test "integration" do
validate_ifaddresses()
validate_ifaddresses_on_macos()

if ccm("list") =~ "#{@cluster_name}" do
ccm("switch #{@cluster_name}")
Expand All @@ -23,53 +31,24 @@ defmodule Xandra.Cluster.CCMTest do
ccm("start")
ccm("status")

on_exit(fn ->
ccm("stop")
end)

Process.register(self(), :this_test_process)

cluster =
start_supervised!(
{Xandra.Cluster,
nodes: ["127.0.0.1"],
target_pools: 2,
sync_connect: 5000,
registry_listeners: [:this_test_process]}
{Xandra.Cluster, nodes: ["127.0.0.1"], target_pools: 2, sync_connect: 5000}
)

wait_for_passing(5000, fn ->
assert map_size(:sys.get_state(cluster).pools) == 2
end)

cluster_state = :sys.get_state(cluster)
assert {:ok, _} = Xandra.Cluster.execute(cluster, "SELECT * FROM system.local", [])

assert map_size(cluster_state.pools) == 2

pool_addresses =
MapSet.new(cluster_state.pools, fn {{address, port}, _} ->
assert port == 9042
address
connected_hosts =
wait_for_passing(5000, fn ->
connected_hosts = Xandra.Cluster.connected_hosts(cluster)
assert length(connected_hosts) == 2
connected_hosts
end)

assert [{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}]
|> MapSet.new()
|> MapSet.intersection(pool_addresses) == pool_addresses

assert {{:connected, _connected_node}, ctrl_conn_state} =
:sys.get_state(cluster_state.control_connection)

assert %{
{{127, 0, 0, 1}, 9042} => %{host: _host1, status: :up},
{{127, 0, 0, 2}, 9042} => %{host: _host2, status: :up},
{{127, 0, 0, 3}, 9042} => %{host: _host3, status: :up}
} = ctrl_conn_state.peers

assert_receive {:register, _registry, {{registry_addr1, 9042}, 1}, _pid1, :up}
assert_receive {:register, _registry, {{registry_addr2, 9042}, 1}, _pid2, :up}
connected_hosts_set = MapSet.new(connected_hosts, fn %Host{address: address} -> address end)

assert MapSet.subset?(
MapSet.new([registry_addr1, registry_addr2]),
connected_hosts_set,
MapSet.new([{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}])
)
end
Expand All @@ -78,7 +57,7 @@ defmodule Xandra.Cluster.CCMTest do
cmd!("ccm", String.split(args))
end

defp validate_ifaddresses do
defp validate_ifaddresses_on_macos do
if :os.type() == {:unix, :darwin} do
{:ok, addresses} = :inet.getifaddrs()
assert {~c"lo0", info} = List.keyfind!(addresses, ~c"lo0", 0)
Expand Down
1 change: 0 additions & 1 deletion test/xandra/cluster/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Xandra.Cluster.PoolTest do
control_connection_module: ControlConnection,
nodes: [{~c"127.0.0.1", @port}],
sync_connect: false,
registry_listeners: [],
load_balancing: :random,
autodiscovered_nodes_port: @port,
xandra_module: Xandra,
Expand Down
Loading

0 comments on commit b9c1e6a

Please sign in to comment.