diff --git a/lib/xandra.ex b/lib/xandra.ex index 32c3a45d..0d75692e 100644 --- a/lib/xandra.ex +++ b/lib/xandra.ex @@ -391,7 +391,6 @@ defmodule Xandra do ], # Internal options, used by Xandra.Cluster. - registry: [doc: false, type: :atom], cluster_pid: [doc: false, type: :pid] ] diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index cc063129..1dc99994 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -186,7 +186,7 @@ defmodule Xandra.Cluster do """ alias Xandra.{Batch, ConnectionError, Prepared, RetryStrategy} - alias Xandra.Cluster.{ControlConnection, Pool} + alias Xandra.Cluster.{ControlConnection, Host, Pool} @typedoc """ A Xandra cluster. @@ -308,8 +308,7 @@ defmodule Xandra.Cluster do # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], control_connection_module: [type: :atom, default: ControlConnection, doc: false], - test_discovered_hosts: [type: :any, default: [], doc: false], - registry_listeners: [type: :any, default: [], doc: false] + test_discovered_hosts: [type: :any, default: [], doc: false] ] @start_link_opts_schema_keys Keyword.keys(@start_link_opts_schema) @@ -546,6 +545,15 @@ defmodule Xandra.Cluster do Pool.stop(cluster, reason, timeout) end + @doc """ + Returns a list of hosts that the cluster has outgoing connections to. + """ + @doc since: "0.18.0" + @spec connected_hosts(cluster) :: [Host.t()] + def connected_hosts(cluster) do + Pool.connected_hosts(cluster) + end + defp with_conn_and_retrying(cluster, options, fun) do RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end) end diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index e92d9281..424f7fb7 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -26,11 +26,36 @@ defmodule Xandra.Cluster.Pool do alias_or_nil = if sync_connect_timeout, do: alias() - case :gen_statem.start_link( - __MODULE__, - {cluster_opts, pool_opts, alias_or_nil}, - genstatem_opts - ) do + start_arg = {cluster_opts, pool_opts, alias_or_nil} + + result = + case Keyword.fetch(cluster_opts, :name) do + :error -> + :gen_statem.start_link(__MODULE__, start_arg, genstatem_opts) + + {:ok, atom} when is_atom(atom) -> + :gen_statem.start_link({:local, atom}, __MODULE__, start_arg, genstatem_opts) + + {:ok, {:global, _term} = tuple} -> + :gen_statem.start_link(tuple, __MODULE__, start_arg, genstatem_opts) + + {:ok, {:via, via_module, _term} = tuple} when is_atom(via_module) -> + :gen_statem.start_link(tuple, __MODULE__, start_arg, genstatem_opts) + + {:ok, other} -> + raise ArgumentError, """ + expected :name option to be one of the following: + + * nil + * atom + * {:global, term} + * {:via, module, term} + + Got: #{inspect(other)} + """ + end + + case result do {:ok, pid} when sync_connect_timeout == false -> {:ok, pid} @@ -62,6 +87,11 @@ defmodule Xandra.Cluster.Pool do :gen_statem.call(pid, :checkout) end + @spec connected_hosts(:gen_statem.server_ref()) :: [Host.t()] + def connected_hosts(pid) do + :gen_statem.call(pid, :connected_hosts) + end + ## Data defstruct [ @@ -90,9 +120,6 @@ defmodule Xandra.Cluster.Pool do :load_balancing_module, :load_balancing_state, - # The registry where connections are registered. - :registry, - # The number of target pools. :target_pools, @@ -129,16 +156,6 @@ defmodule Xandra.Cluster.Pool do def init({cluster_opts, pool_opts, sync_connect_alias_or_nil}) do Process.flag(:trap_exit, true) - registry_name = - Module.concat([Xandra.ClusterRegistry, to_string(System.unique_integer([:positive]))]) - - {:ok, _} = - Registry.start_link( - keys: :unique, - name: registry_name, - listeners: Keyword.fetch!(cluster_opts, :registry_listeners) - ) - # Start supervisor for the pools. {:ok, pool_sup} = Supervisor.start_link([], strategy: :one_for_one) @@ -160,7 +177,6 @@ defmodule Xandra.Cluster.Pool do control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module), target_pools: Keyword.fetch!(cluster_opts, :target_pools), sync_connect_alias: sync_connect_alias_or_nil, - registry: registry_name, name: Keyword.get(cluster_opts, :name), pool_supervisor: pool_sup, refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval) @@ -200,6 +216,16 @@ defmodule Xandra.Cluster.Pool do {:keep_state, data, {:reply, from, reply}} end + def handle_event({:call, from}, :connected_hosts, @state, %__MODULE__{} = data) do + connected_hosts = + for %{status: :connected, pool_pid: pool_pid, host: host} <- Map.values(data.peers), + is_pid(pool_pid) do + host + end + + {:keep_state_and_data, {:reply, from, connected_hosts}} + end + def handle_event(:into, {:host_up, %Host{} = host}, @state, %__MODULE__{} = data) do data = update_in(data.load_balancing_state, &data.load_balancing_module.host_up(&1, host)) @@ -288,6 +314,12 @@ defmodule Xandra.Cluster.Pool do {:keep_state, data} end + def handle_event(:info, {:EXIT, _pid, reason}, @state, %__MODULE__{} = _data) + when reason in [:normal, :shutdown] or + (is_tuple(reason) and tuple_size(reason) == 2 and elem(reason, 0) == :shutdown) do + :keep_state_and_data + end + def handle_event({:timeout, :reconnect_control_connection}, nil, @state, %__MODULE__{} = data) do {:keep_state, data, {:next_event, :internal, :start_control_connection}} end @@ -327,11 +359,7 @@ defmodule Xandra.Cluster.Pool do # peer, and it'll only start it once. defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = - Keyword.merge(data.pool_options, - nodes: [Host.format_address(host)], - registry: data.registry, - cluster_pid: self() - ) + Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) peername = Host.to_peername(host) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 5bc29b34..cd9cf6c6 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -22,7 +22,6 @@ defmodule Xandra.Connection do :address, :port, :connection_name, - :registry, :cluster_pid, :pool_index, :peername @@ -58,7 +57,6 @@ defmodule Xandra.Connection do address: address, port: port, connection_name: connection_name, - registry: Keyword.get(options, :registry), cluster_pid: cluster_pid, pool_index: Keyword.fetch!(options, :pool_index), peername: peername @@ -84,8 +82,6 @@ defmodule Xandra.Connection do supported_options: supported_options }) - maybe_register_or_put_value(state, :up) - if cluster_pid do send(cluster_pid, {:xandra, :connected, peername, self()}) end @@ -389,7 +385,6 @@ defmodule Xandra.Connection do send(state.cluster_pid, {:xandra, :disconnected, {state.address, state.port}, self()}) end - maybe_register_or_put_value(state, :down) :ok = transport.close(socket) end @@ -474,23 +469,6 @@ defmodule Xandra.Connection do end end - defp maybe_register_or_put_value(%__MODULE__{registry: nil}, _value) do - :ok - end - - defp maybe_register_or_put_value(%__MODULE__{peername: {address, port}} = state, value) - when is_tuple(address) do - key = {{address, port}, state.pool_index} - - case Registry.register(state.registry, key, value) do - {:ok, _owner} -> - :ok - - {:error, {:already_registered, _}} -> - {_new, _old} = Registry.update_value(state.registry, key, fn _ -> value end) - end - end - defp get_right_compressor(%__MODULE__{} = state, provided) do case Xandra.Protocol.frame_protocol_format(state.protocol_module) do :v5_or_more -> assert_valid_compressor(state.compressor, provided) || state.compressor diff --git a/lib/xandra/telemetry.ex b/lib/xandra/telemetry.ex index 2966c387..f28168b8 100644 --- a/lib/xandra/telemetry.ex +++ b/lib/xandra/telemetry.ex @@ -276,13 +276,7 @@ defmodule Xandra.Telemetry do logger_meta = case Map.fetch(metadata, :host) do {:ok, %Host{address: address, port: port}} -> - address = - case address do - ip when is_tuple(ip) -> ip |> :inet.ntoa() |> to_string() - str when is_list(str) -> to_string(str) - end - - [xandra_address: address, xandra_port: port] + [xandra_address: address_to_string(address), xandra_port: port] :error -> [] @@ -314,7 +308,7 @@ defmodule Xandra.Telemetry do def handle_event([:xandra | event], measurements, metadata, :no_config) do %{address: address, port: port} = metadata - logger_meta = [xandra_address: address, xandra_port: port] + logger_meta = [xandra_address: address_to_string(address), xandra_port: port] case event do [:connected] -> @@ -368,15 +362,21 @@ defmodule Xandra.Telemetry do Logger.debug( "Could not use protocol #{inspect(metadata.failed_version)}, " <> "downgrading to #{inspect(metadata.new_version)}", - xandra_address: metadata.address, + xandra_address: address_to_string(metadata.address), xandra_port: metadata.port ) end def handle_debug_event([:xandra, :connected], _measurements, metadata, :no_config) do - logger_meta = [xandra_address: metadata.address, xandra_port: metadata.port] + logger_meta = [ + xandra_address: address_to_string(metadata.address), + xandra_port: metadata.port + ] Logger.debug("Connected using protocol #{inspect(metadata.protocol_module)}", logger_meta) Logger.debug("Supported options: #{inspect(metadata.supported_options)}", logger_meta) end + + defp address_to_string(ip) when is_tuple(ip), do: ip |> :inet.ntoa() |> to_string() + defp address_to_string(other), do: to_string(other) end diff --git a/test/xandra/cluster/ccm_test.exs b/test/xandra/cluster/ccm_test.exs index f73ecb02..e936af9b 100644 --- a/test/xandra/cluster/ccm_test.exs +++ b/test/xandra/cluster/ccm_test.exs @@ -3,6 +3,8 @@ defmodule Xandra.Cluster.CCMTest do import Xandra.TestHelper, only: [cmd!: 2, wait_for_passing: 2] + alias Xandra.Cluster.Host + @moduletag :integration @moduletag :ccm @@ -10,8 +12,14 @@ defmodule Xandra.Cluster.CCMTest do @cassandra_version "4.1.3" @node_count 3 + setup_all do + on_exit(fn -> + ccm("stop") + end) + end + test "integration" do - validate_ifaddresses() + validate_ifaddresses_on_macos() if ccm("list") =~ "#{@cluster_name}" do ccm("switch #{@cluster_name}") @@ -23,53 +31,24 @@ defmodule Xandra.Cluster.CCMTest do ccm("start") ccm("status") - on_exit(fn -> - ccm("stop") - end) - - Process.register(self(), :this_test_process) - cluster = start_supervised!( - {Xandra.Cluster, - nodes: ["127.0.0.1"], - target_pools: 2, - sync_connect: 5000, - registry_listeners: [:this_test_process]} + {Xandra.Cluster, nodes: ["127.0.0.1"], target_pools: 2, sync_connect: 5000} ) - wait_for_passing(5000, fn -> - assert map_size(:sys.get_state(cluster).pools) == 2 - end) - - cluster_state = :sys.get_state(cluster) + assert {:ok, _} = Xandra.Cluster.execute(cluster, "SELECT * FROM system.local", []) - assert map_size(cluster_state.pools) == 2 - - pool_addresses = - MapSet.new(cluster_state.pools, fn {{address, port}, _} -> - assert port == 9042 - address + connected_hosts = + wait_for_passing(5000, fn -> + connected_hosts = Xandra.Cluster.connected_hosts(cluster) + assert length(connected_hosts) == 2 + connected_hosts end) - assert [{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}] - |> MapSet.new() - |> MapSet.intersection(pool_addresses) == pool_addresses - - assert {{:connected, _connected_node}, ctrl_conn_state} = - :sys.get_state(cluster_state.control_connection) - - assert %{ - {{127, 0, 0, 1}, 9042} => %{host: _host1, status: :up}, - {{127, 0, 0, 2}, 9042} => %{host: _host2, status: :up}, - {{127, 0, 0, 3}, 9042} => %{host: _host3, status: :up} - } = ctrl_conn_state.peers - - assert_receive {:register, _registry, {{registry_addr1, 9042}, 1}, _pid1, :up} - assert_receive {:register, _registry, {{registry_addr2, 9042}, 1}, _pid2, :up} + connected_hosts_set = MapSet.new(connected_hosts, fn %Host{address: address} -> address end) assert MapSet.subset?( - MapSet.new([registry_addr1, registry_addr2]), + connected_hosts_set, MapSet.new([{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}]) ) end @@ -78,7 +57,7 @@ defmodule Xandra.Cluster.CCMTest do cmd!("ccm", String.split(args)) end - defp validate_ifaddresses do + defp validate_ifaddresses_on_macos do if :os.type() == {:unix, :darwin} do {:ok, addresses} = :inet.getifaddrs() assert {~c"lo0", info} = List.keyfind!(addresses, ~c"lo0", 0) diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 1479b7b2..cb72c96f 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -18,7 +18,6 @@ defmodule Xandra.Cluster.PoolTest do control_connection_module: ControlConnection, nodes: [{~c"127.0.0.1", @port}], sync_connect: false, - registry_listeners: [], load_balancing: :random, autodiscovered_nodes_port: @port, xandra_module: Xandra, diff --git a/test/xandra/cluster_test.exs b/test/xandra/cluster_test.exs index 5b957918..0c2fad47 100644 --- a/test/xandra/cluster_test.exs +++ b/test/xandra/cluster_test.exs @@ -1,9 +1,8 @@ defmodule Xandra.ClusterTest do - use ExUnit.Case + use ExUnit.Case, async: true alias Xandra.TestHelper alias Xandra.Cluster.Host - alias Xandra.Cluster.LoadBalancingPolicy defmodule PoolMock do use GenServer @@ -109,260 +108,6 @@ defmodule Xandra.ClusterTest do end end - describe "in the starting phase" do - test "starts a single control connection with the right contact points", %{test_ref: test_ref} do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1.example.com", "node2.example.com"] - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - assert_receive {^test_ref, ControlConnectionMock, :init_called, args} - assert args.contact_points == [{~c"node1.example.com", 9042}, {~c"node2.example.com", 9042}] - assert args.cluster == cluster - end - - test "starts one pool per node up to :target_pools", %{test_ref: test_ref} do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1.example.com"], - target_pools: 2, - load_balancing: {LoadBalancingPolicy.DCAwareRoundRobin, []} - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - assert_control_connection_started(test_ref) - - discovered_peers(cluster, [ - host1 = %Host{address: {199, 0, 0, 1}, port: 9042}, - host2 = %Host{address: {199, 0, 0, 2}, port: 9042}, - %Host{address: {199, 0, 0, 3}, port: 9042} - ]) - - # Assert that the cluster starts a pool for each discovered peer up to :target_pools. - assert_pool_started(test_ref, host1) - assert_pool_started(test_ref, host2) - refute_other_pools_started(test_ref) - end - - # TODO: remove this conditional once we depend on Elixir 1.15+, which depends on OTP 24+. - if System.otp_release() >= "24" do - test "waits for any node to be up if :sync_connect is a timeout", %{test_ref: test_ref} do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1.example.com"], - target_pools: 1, - load_balancing: {LoadBalancingPolicy.DCAwareRoundRobin, []}, - sync_connect: 1000, - test_discovered_hosts: [host1 = %Host{address: {127, 0, 0, 1}, port: 9042}] - ] - - TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - assert_control_connection_started(test_ref) - assert_pool_started(test_ref, host1) - end - - test "returns {:error, :sync_connect_timeout} if no nodes connect within the timeout" do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1.example.com"], - target_pools: 1, - load_balancing: {LoadBalancingPolicy.DCAwareRoundRobin, []}, - sync_connect: 0 - ] - - assert {:error, {:sync_connect_timeout, _}} = start_supervised({Xandra.Cluster, opts}) - end - end - end - - test "handles status change events", %{test_ref: test_ref} do - host1 = %Host{address: {199, 0, 0, 1}, port: 9042} - host2 = %Host{address: {199, 0, 0, 2}, port: 9042} - peername1 = Host.to_peername(host1) - peername2 = Host.to_peername(host2) - - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1"], - target_pools: 1, - load_balancing: {LoadBalancingPolicy.DCAwareRoundRobin, []} - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - assert_control_connection_started(test_ref) - - discovered_peers(cluster, [host1, host2]) - - assert_pool_started(test_ref, host1) - refute_other_pools_started(test_ref) - - assert %{pool_supervisor: pool_sup, pools: %{^peername1 => pool1}} = :sys.get_state(cluster) - - assert [{^peername1, ^pool1, :worker, _}] = Supervisor.which_children(pool_sup) - assert Process.alive?(pool1) - - pool_monitor_ref = Process.monitor(pool1) - - # StatusChange DOWN: - send(cluster, {:host_down, host1}) - assert_receive {:DOWN, ^pool_monitor_ref, _, _, _} - - TestHelper.wait_for_passing(100, fn -> - assert {_, :undefined, _, _} = - List.keyfind(Supervisor.which_children(pool_sup), peername1, 0) - end) - - # The cluster starts a pool to the other node. - assert_pool_started(test_ref, host2) - - assert %{pools: %{^peername2 => pool2}} = :sys.get_state(cluster) - - # StatusChange UP, which doesn't change which host goes up. - send(cluster, {:host_up, host1}) - - TestHelper.wait_for_passing(100, fn -> - assert :sys.get_state(cluster).pools == %{peername2 => pool2} - end) - end - - test "handles topology change events", %{test_ref: test_ref} do - host = %Host{address: {199, 0, 0, 1}, port: 9042} - new_host = %Host{address: {199, 0, 0, 2}, port: 9042} - ignored_host = %Host{address: {199, 0, 0, 3}, port: 9042} - - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1"], - target_pools: 2, - load_balancing: {LoadBalancingPolicy.Random, []} - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - assert_control_connection_started(test_ref) - - discovered_peers(cluster, [host]) - assert_pool_started(test_ref, host) - - # TopologyChange NEW_NODE starts a new pool to the node. - send(cluster, {:host_added, new_host}) - assert_pool_started(test_ref, new_host) - - # TopologyChange NEW_NODE doesn't do anything, since we reached :target_pools. - send(cluster, {:host_added, ignored_host}) - refute_other_pools_started(test_ref) - send(cluster, {:host_removed, ignored_host}) - - # TopologyChange REMOVED_NODE (removing the original node) stops the pool. - assert {:ok, pool} = Map.fetch(:sys.get_state(cluster).pools, Host.to_peername(host)) - pool_monitor_ref = Process.monitor(pool) - send(cluster, {:host_removed, host}) - assert_receive {:DOWN, ^pool_monitor_ref, _, _, _} - - TestHelper.wait_for_passing(500, fn -> - assert [_] = Supervisor.which_children(:sys.get_state(cluster).pool_supervisor) - end) - end - - test "handles the same peers being re-reported", %{test_ref: test_ref} do - host1 = %Host{address: {192, 0, 0, 1}, port: 9042} - host2 = %Host{address: {192, 0, 0, 2}, port: 9042} - - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: [Host.format_address(host1)] - ] - - # Start the cluster. - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - # First, the control connection is started and both pools as well. - assert_control_connection_started(test_ref) - discovered_peers(cluster, [host1]) - assert_pool_started(test_ref, host1) - assert Map.has_key?(:sys.get_state(cluster).pools, Host.to_peername(host1)) - - # Now simulate the control connection re-reporting different peers for some reason. - discovered_peers(cluster, [host1, host2]) - - assert_pool_started(test_ref, host2) - refute_other_pools_started(test_ref) - - assert Map.has_key?(:sys.get_state(cluster).pools, Host.to_peername(host2)) - end - - describe "checkout call" do - test "with no pools" do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1"] - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - assert GenServer.call(cluster, :checkout) == {:error, :empty} - end - - test "with the Random load-balancing policy", %{test_ref: test_ref} do - opts = [ - xandra_module: PoolMock, - control_connection_module: ControlConnectionMock, - nodes: ["node1", "node2"], - load_balancing: {LoadBalancingPolicy.Random, []} - ] - - cluster = TestHelper.start_link_supervised!({Xandra.Cluster, opts}) - - assert_control_connection_started(test_ref) - - discovered_peers(cluster, [ - host1 = %Host{address: {192, 0, 0, 1}, port: 9042}, - host2 = %Host{address: {192, 0, 0, 2}, port: 9042} - ]) - - assert_pool_started(test_ref, host1) - assert_pool_started(test_ref, host2) - - lbs = :sys.get_state(cluster).load_balancing_state - assert MapSet.new(lbs) == MapSet.new([{host1, :up}, {host2, :up}]) - - report_peers_connected(cluster, [host1, host2]) - - lbs = :sys.get_state(cluster).load_balancing_state - assert MapSet.new(lbs) == MapSet.new([{host1, :connected}, {host2, :connected}]) - - pool_pids = - TestHelper.wait_for_passing(500, fn -> - pools = :sys.get_state(cluster).pools - assert map_size(pools) == 2 - for {_address, pid} <- pools, do: pid - end) - - # If we check out enough times with load_balancing: :random, statistically it - # means we have to have a non-sorted list of pids. - random_pids = - for _ <- 1..50 do - assert {:ok, pid} = GenServer.call(cluster, :checkout) - assert pid in pool_pids - pid - end - - assert random_pids != Enum.sort(random_pids, :asc) and - random_pids != Enum.sort(random_pids, :desc) - end - end - describe "stop/1" do test "stops the cluster", %{test_ref: test_ref} do opts = [ @@ -379,24 +124,7 @@ defmodule Xandra.ClusterTest do end end - defp assert_pool_started(test_ref, %Host{} = host) do - node = Host.format_address(host) - assert_receive {^test_ref, PoolMock, :init_called, %{nodes: [^node]}} - end - - defp refute_other_pools_started(test_ref) do - refute_receive {^test_ref, PoolMock, :init_called, _args}, 50 - end - defp assert_control_connection_started(test_ref) do assert_receive {^test_ref, ControlConnectionMock, :init_called, _start_args} end - - defp discovered_peers(cluster, hosts) do - Enum.each(hosts, &send(cluster, {:host_added, &1})) - end - - defp report_peers_connected(cluster, hosts) do - Enum.each(hosts, &send(cluster, {:host_connected, &1})) - end end diff --git a/test_clustering/Dockerfile b/test_clustering/Dockerfile deleted file mode 100644 index ddd1ee20..00000000 --- a/test_clustering/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -ARG ELIXIR_VERSION=1.13.4 -ARG OTP_VERSION=24.2 - -FROM hexpm/elixir:${ELIXIR_VERSION}-erlang-${OTP_VERSION}-alpine-3.14.2 - -WORKDIR /app - -ENV MIX_ENV=test - -# Install Docker and Docker Compose to control sibling containers, and Git for installing -# Git dependencies if necessary. -RUN apk update && \ - apk add docker docker-compose git curl openssl - -# Copy only the files needed to fetch and compile deps. -COPY mix.exs . -COPY mix.lock . - -# Install rebar3 and Hex and then compile dependencies. This will be cached by -# Docker unless we change dependencies. -RUN mix do local.rebar --force, \ - local.hex --force, \ - deps.get --only test, \ - deps.compile - -# Now copy Xandra's code. -COPY lib lib -COPY test test -COPY test_clustering test_clustering diff --git a/test_clustering/cassandra_node.dockerfile b/test_clustering/cassandra_node.dockerfile deleted file mode 100644 index 16860b74..00000000 --- a/test_clustering/cassandra_node.dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -ARG CASSANDRA_VERSION=4 - -FROM cassandra:$CASSANDRA_VERSION - -# Enable user-defined functions. -RUN sed -i -e "s/\(enable_user_defined_functions: \)false/\1true/" /etc/cassandra/cassandra.yaml - -# Disable auto bootstrapping. -RUN echo "auto_bootstrap: false" >> /etc/cassandra/cassandra.yaml diff --git a/test_clustering/docker-compose.cluster.yml b/test_clustering/docker-compose.cluster.yml deleted file mode 100644 index b483a9d4..00000000 --- a/test_clustering/docker-compose.cluster.yml +++ /dev/null @@ -1,78 +0,0 @@ -version: '3' - -services: - seed: - build: - context: "." - dockerfile: cassandra_node.dockerfile - hostname: seed - environment: - - CASSANDRA_CLUSTER_NAME=xandra-cluster - - CASSANDRA_DC=xandradc - - HEAP_NEWSIZE=1M - - MAX_HEAP_SIZE=200M - - START_RPC=false - - NUM_TOKENS=3 - - RACK=RAC1 - ulimits: - memlock: -1 - nproc: 32768 - nofile: 100000 - - node1: - build: - context: "." - dockerfile: cassandra_node.dockerfile - hostname: node1 - environment: - - CASSANDRA_SEEDS=seed - - CASSANDRA_CLUSTER_NAME=xandra-cluster - - CASSANDRA_DC=xandradc - - HEAP_NEWSIZE=1M - - MAX_HEAP_SIZE=200M - depends_on: - - seed - - node2: - build: - context: "." - dockerfile: cassandra_node.dockerfile - hostname: node2 - environment: - - CASSANDRA_SEEDS=seed - - CASSANDRA_CLUSTER_NAME=xandra-cluster - - CASSANDRA_DC=xandradc - - HEAP_NEWSIZE=1M - - MAX_HEAP_SIZE=200M - depends_on: - - seed - - node3: - build: - context: "." - dockerfile: cassandra_node.dockerfile - hostname: node3 - environment: - - CASSANDRA_SEEDS=seed - - CASSANDRA_CLUSTER_NAME=xandra-cluster - - CASSANDRA_DC=xandradc - - HEAP_NEWSIZE=1M - - MAX_HEAP_SIZE=200M - depends_on: - - seed - - elixir: - # We need to build from the root of Xandra itself so that we can copy Xandra's code inside - # the container, but then we want to use the Dockerfile in this directory. - build: - context: ".." - dockerfile: ./test_clustering/Dockerfile - links: - - seed - - node1 - - node2 - - node3 - volumes: - - /var/run/docker.sock:/var/run/docker.sock - environment: - - CASSANDRA_NATIVE_PROTOCOL diff --git a/test_clustering/docker_helpers.exs b/test_clustering/docker_helpers.exs deleted file mode 100644 index fcbd0bd0..00000000 --- a/test_clustering/docker_helpers.exs +++ /dev/null @@ -1,90 +0,0 @@ -defmodule Xandra.TestClustering.DockerHelpers do - import ExUnit.Assertions - - def docker_compose_file() do - Path.join(__DIR__, "docker-compose.cluster.yml") - end - - def run!(cmd, args) do - {_output, exit_status} = - System.cmd(cmd, args, stderr_to_stdout: true, into: IO.stream(:stdio, :line)) - - if exit_status != 0 do - flunk(""" - Command exited with non-zero status #{exit_status}: - - #{cmd} #{Enum.join(args, " ")} - """) - end - end - - def docker_compose(args) do - System.cmd("docker-compose", ["-f", docker_compose_file() | args], stderr_to_stdout: true) - end - - def docker_compose!(args) do - case docker_compose(args) do - {output, 0} -> - output - - {output, exit_status} -> - flunk(""" - docker-compose exited with status #{exit_status}. The command was: - - docker-compose #{Enum.join(args, " ")} - - The logged output was this: - - #{output} - """) - end - end - - def wait_for_container_up(name) do - wait_for_container_up(name, _retry_interval = 1000, _timeout_left = 30_000) - end - - def wait_for_container_up(name, _retry_interval, timeout_left) when timeout_left <= 0 do - flunk("Container is still not running or responding to health checks: #{inspect(name)}") - end - - def wait_for_container_up(name, retry_interval, timeout_left) do - container_id = String.trim(docker_compose!(["ps", "-q", name])) - - {output, exit_status} = - System.cmd("docker", [ - "inspect", - "--format", - "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}", - container_id - ]) - - IO.inspect(output) - - assert exit_status == 0, "'docker inspect' failed with exit status #{exit_status}: #{output}" - - ip = String.trim(output) - - {output, exit_status} = - docker_compose(["exec", "-T", name, "nodetool", "-h", "::FFFF:127.0.0.1", "status"]) - - cond do - exit_status == 0 -> - if output - |> String.split("\n") - |> Enum.any?(&(&1 =~ ip and String.starts_with?(&1, "UN"))) do - IO.puts("🌐 Cassandra node '#{name}' is up") - else - Process.sleep(retry_interval) - wait_for_container_up(name, retry_interval, timeout_left - retry_interval) - end - - output =~ "Has this node finished starting up?" and exit_status != 0 -> - Process.sleep(retry_interval) - wait_for_container_up(name, retry_interval, timeout_left - retry_interval) - - true -> - wait_for_container_up(name, retry_interval, timeout_left - retry_interval) - end - end -end diff --git a/test_clustering/integration_test.exs b/test_clustering/integration_test.exs deleted file mode 100644 index aa7e68df..00000000 --- a/test_clustering/integration_test.exs +++ /dev/null @@ -1,162 +0,0 @@ -Code.require_file("docker_helpers.exs", __DIR__) - -ExUnit.start(trace: true, timeout: 300_000) - -ExUnit.after_suite(fn _results -> - Xandra.TestClustering.DockerHelpers.docker_compose!(["down", "--volumes"]) -end) - -defmodule Xandra.TestClustering.IntegrationTest do - use ExUnit.Case - - alias Xandra.TestHelper - - import Xandra.TestClustering.DockerHelpers - - @protocol_version (case System.get_env("CASSANDRA_NATIVE_PROTOCOL", "") do - "v3" -> :v3 - "v4" -> :v4 - "v5" -> :v5 - "" -> nil - end) - - setup do - IO.puts("🚧 Starting Cassandra cluster with docker-compose up -d...") - nodes = ["seed", "node1", "node2", "node3"] - - start_time = System.system_time() - - docker_compose!(["up", "-d", "--build"] ++ nodes) - Enum.each(nodes, &wait_for_container_up/1) - - on_exit(fn -> - IO.write("🛑 Stopping Cassandra cluster...") - {elapsed_microsec, _} = :timer.tc(fn -> docker_compose!(["stop"] ++ nodes) end) - IO.write(" Done in #{Float.round(elapsed_microsec / 1_000_000, 3)}s") - end) - - elapsed_ms = - System.convert_time_unit(System.system_time() - start_time, :native, :millisecond) - - IO.puts("✅ Done in #{elapsed_ms / 1000}s") - - :ok - end - - test "if a node goes down, the cluster removes its control connection and pool" do - conn_count_in_cluster = 4 - - {:ok, cluster} = - Xandra.Cluster.start_link( - autodiscovery: true, - nodes: ["node1", "seed"], - protocol_version: @protocol_version - ) - - TestHelper.wait_for_passing(60_000, fn -> - assert %Xandra.Cluster{} = cluster_state = :sys.get_state(cluster) - assert length(cluster_state.node_refs) == conn_count_in_cluster - end) - - # Wait for all pools to be started. - TestHelper.wait_for_passing(60_000, fn -> - assert map_size(:sys.get_state(cluster).pools) == conn_count_in_cluster - end) - - docker_compose!(["stop", "node2"]) - - # Wait for the pool for the stopped node to be stopped. - TestHelper.wait_for_passing(60_000, fn -> - assert map_size(:sys.get_state(cluster).pools) == conn_count_in_cluster - 1 - end) - end - - test "if a node goes down and then rejoins, the cluster readds its control connection and pool" do - conn_count_in_cluster = 4 - - {:ok, cluster} = - Xandra.Cluster.start_link( - autodiscovery: true, - nodes: ["node1", "seed"], - protocol_version: @protocol_version - ) - - TestHelper.wait_for_passing(60_000, fn -> - assert %Xandra.Cluster{} = cluster_state = :sys.get_state(cluster) - assert length(cluster_state.node_refs) == conn_count_in_cluster - end) - - # Wait for all pools to be started. - TestHelper.wait_for_passing(60_000, fn -> - assert map_size(:sys.get_state(cluster).pools) == conn_count_in_cluster - end) - - docker_compose!(["stop", "node2"]) - - # Wait for the pool for the stopped node to be stopped. - TestHelper.wait_for_passing(60_000, fn -> - assert map_size(:sys.get_state(cluster).pools) == conn_count_in_cluster - 1 - end) - - docker_compose!(["up", "-d", "node2"]) - - # Wait for the pool to the restarted node to be up. - TestHelper.wait_for_passing(60_000, fn -> - assert map_size(:sys.get_state(cluster).pools) == conn_count_in_cluster - end) - end - - test "connect and discover peers" do - conn_count_in_cluster = 4 - - {:ok, cluster} = - Xandra.Cluster.start_link( - autodiscovery: true, - nodes: ["node1", "seed"], - protocol_version: @protocol_version - ) - - cluster_state = - TestHelper.wait_for_passing(60_000, fn -> - assert %Xandra.Cluster{} = cluster_state = :sys.get_state(cluster) - - assert length(cluster_state.node_refs) == conn_count_in_cluster, - "expected #{conn_count_in_cluster} elements in node_refs, " <> - "got: #{inspect(cluster_state.node_refs)}" - - cluster_state - end) - - control_conn_peernames = - TestHelper.wait_for_passing(30_000, fn -> - for {peername, ref} <- cluster_state.node_refs do - assert is_reference(ref) - assert {ip, port} = peername - assert is_tuple(ip) - assert port == 9042 - - peername - end - end) - - assert map_size(cluster_state.pools) == conn_count_in_cluster - - assert Enum.sort(Map.keys(cluster_state.pools)) == Enum.sort(control_conn_peernames) - - pool_children = - TestHelper.wait_for_passing(60_000, fn -> - children = Supervisor.which_children(cluster_state.control_conn_supervisor) - assert length(children) == conn_count_in_cluster - children - end) - - for {child_id, pid, _, _} <- pool_children do - assert {peername, _} = List.keyfind(cluster_state.node_refs, child_id, 1) - - {:connected, control_conn_state} = :sys.get_state(pid) - assert peername == control_conn_state.peername - end - - Xandra.Cluster.execute!(cluster, "SELECT * FROM system_schema.keyspaces") - end -end diff --git a/test_clustering/run.exs b/test_clustering/run.exs deleted file mode 100644 index df2a1f59..00000000 --- a/test_clustering/run.exs +++ /dev/null @@ -1,20 +0,0 @@ -Code.require_file("docker_helpers.exs", __DIR__) - -IO.puts("Running clustering tests") - -Xandra.TestClustering.DockerHelpers.run!("docker-compose", [ - "-f", - Xandra.TestClustering.DockerHelpers.docker_compose_file(), - "build", - "elixir" -]) - -Xandra.TestClustering.DockerHelpers.run!("docker-compose", [ - "-f", - Xandra.TestClustering.DockerHelpers.docker_compose_file(), - "run", - "elixir", - "mix", - "run", - "test_clustering/integration_test.exs" -])