diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a67bacfd..61e3da27 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -122,6 +122,8 @@ jobs: CASSANDRA_VERSION: ${{ matrix.server_versions.cassandra }} SCYLLA_VERSION: ${{ matrix.server_versions.scylla }} CASSANDRA_NATIVE_PROTOCOL: ${{ matrix.cassandra_native_protocol }} + LOG_LEVEL: debug + XANDRA_DEBUG: true steps: - name: Clone the repository @@ -132,6 +134,20 @@ jobs: docker-compose up --detach --build ./test/docker/health-check-services.sh + - name: Install Python (for ccm) + uses: actions/setup-python@v4 + with: + python-version: "3.9.16" + + - name: Install ccm + run: pip3 install ccm + + - name: Install OpenJDK + uses: actions/setup-java@v2 + with: + distribution: "zulu" + java-version: "11" + - name: Install OTP and Elixir uses: erlef/setup-beam@v1 with: @@ -157,11 +173,11 @@ jobs: # TODO: eventually figure out why we can't run encryption tests on CI. - name: Run tests for Cassandra and Scylla if: ${{ !matrix.test_only_cassandra }} - run: mix test.all --trace --exclude encryption + run: mix test.all --trace --exclude encryption --exclude ccm - name: Run tests for Cassandra only if: ${{ matrix.test_only_cassandra }} - run: mix test --trace --exclude encryption + run: mix test --trace --exclude encryption --exclude ccm - name: Dump Docker logs on failure uses: jwalton/gh-docker-logs@v1 diff --git a/docker-compose.yml b/docker-compose.yml index e89b462b..e5967b42 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,8 @@ services: args: CASSANDRA_VERSION: "${CASSANDRA_VERSION:-4.1}" ports: - - "9042:9042" # TCP connections - - "9142:9142" # TLS/SSL connections + - "9052:9042" # TCP connections + - "9152:9142" # TLS/SSL connections environment: - HEAP_NEWSIZE=1M - MAX_HEAP_SIZE=200M @@ -31,7 +31,7 @@ services: AUTHENTICATION: "true" CASSANDRA_VERSION: "${CASSANDRA_VERSION:-4.1}" ports: - - "9043:9042" + - "9053:9042" environment: - HEAP_NEWSIZE=1M - MAX_HEAP_SIZE=200M @@ -52,7 +52,7 @@ services: args: SCYLLA_VERSION: ${SCYLLA_VERSION:-2.3.1} ports: - - "9052:9042" + - "9062:9042" command: - "--smp" - "1" @@ -74,7 +74,7 @@ services: SCYLLA_VERSION: ${SCYLLA_VERSION:-2.3.1} AUTHENTICATION: "true" ports: - - "9053:9042" + - "9063:9042" command: - "--smp" - "1" diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index a74b21c6..dcc4b376 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -334,7 +334,8 @@ defmodule Xandra.Cluster do # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], control_connection_module: [type: :atom, default: ControlConnection, doc: false], - test_discovered_hosts: [type: :any, default: [], doc: false] + test_discovered_hosts: [type: :any, default: [], doc: false], + registry_listeners: [type: :any, default: [], doc: false] ] @start_link_opts_schema_keys Keyword.keys(@start_link_opts_schema) @@ -633,7 +634,12 @@ defmodule Xandra.Cluster do registry_name = Module.concat([Xandra.ClusterRegistry, to_string(System.unique_integer([:positive]))]) - {:ok, _} = Registry.start_link(keys: :unique, name: registry_name) + {:ok, _} = + Registry.start_link( + keys: :unique, + name: registry_name, + listeners: Keyword.fetch!(cluster_opts, :registry_listeners) + ) # Start supervisor for the pools. {:ok, pool_sup} = Supervisor.start_link([], strategy: :one_for_one) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 06292b77..2c7beda6 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -300,7 +300,7 @@ defmodule Xandra.Cluster.ControlConnection do spec = [{{{:"$1", :_}, :_, :"$2"}, [{:==, :"$1", {{{address}, port}}}], [:"$2"]}] statuses = Registry.select(data.registry, spec) - if host_info.status == :up and Enum.all?(statuses, &(&1 == :down)) do + if host_info.status == :up and statuses != [] and Enum.all?(statuses, &(&1 == :down)) do execute_telemetry(data, [:change_event], %{}, %{ event_type: :host_down, host: host_info.host, diff --git a/mix.exs b/mix.exs index e984dd35..1ac13968 100644 --- a/mix.exs +++ b/mix.exs @@ -76,10 +76,10 @@ defmodule Xandra.Mixfile do [ "test.scylladb": [ fn _args -> - System.put_env("CASSANDRA_PORT", "9052") - System.put_env("CASSANDRA_WITH_AUTH_PORT", "9053") + System.put_env("CASSANDRA_PORT", "9062") + System.put_env("CASSANDRA_WITH_AUTH_PORT", "9063") end, - "test --exclude cassandra_specific --exclude encryption" + "test --exclude cassandra_specific --exclude encryption --exclude ccm" ], "test.all": fn args -> Mix.Task.run(:test, args) @@ -99,8 +99,7 @@ defmodule Xandra.Mixfile do # Dev and test dependencies {:dialyxir, "~> 1.3", only: [:dev, :test], runtime: false}, {:ex_doc, "~> 0.28", only: :dev}, - # TODO: replace with Hex version once it gets released - {:excoveralls, github: "whatyouhide/excoveralls", branch: "httpc", only: :test}, + {:excoveralls, "~> 0.17", only: :test}, {:stream_data, "~> 0.5.0", only: [:dev, :test]}, {:nimble_lz4, "~> 0.1.3", only: [:dev, :test]} ] diff --git a/mix.lock b/mix.lock index 445b0e25..90897c31 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.32", "fa739a0ecfa34493de19426681b23f6814573faee95dfd4b4aafe15a7b5b32c6", [:mix], [], "hexpm", "b8b0dd77d60373e77a3d7e8afa598f325e49e8663a51bcc2b88ef41838cca755"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"}, - "excoveralls": {:git, "https://github.com/whatyouhide/excoveralls.git", "6275ab6c8db4441fee1548c3803f83971f830626", [branch: "httpc"]}, + "excoveralls": {:hex, :excoveralls, "0.17.0", "279f124dba347903bb654bc40745c493ae265d45040001b4899ea1edf88078c7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "08b638d114387a888f9cb8d65f2a0021ec04c3e447b793efa7c1e734aba93004"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, diff --git a/test/integration/authentication_test.exs b/test/integration/authentication_test.exs index 8c7d8abe..f1d69150 100644 --- a/test/integration/authentication_test.exs +++ b/test/integration/authentication_test.exs @@ -1,6 +1,6 @@ defmodule AuthenticationTest do auth_options = [username: "cassandra", password: "cassandra"] - port = System.get_env("CASSANDRA_WITH_AUTH_PORT", "9043") + port = System.get_env("CASSANDRA_WITH_AUTH_PORT", "9053") use XandraTest.IntegrationCase, start_options: [ diff --git a/test/integration/clustering_integration_test.exs b/test/integration/clustering_integration_test.exs index ce98b417..b8be5875 100644 --- a/test/integration/clustering_integration_test.exs +++ b/test/integration/clustering_integration_test.exs @@ -12,8 +12,7 @@ defmodule ClusteringTest do start_options = Keyword.merge(start_options, load_balancing: :random, - name: TestCluster, - nodes: ["127.0.0.1"] + name: TestCluster ) cluster = TestHelper.start_link_supervised!({Xandra.Cluster, start_options}) diff --git a/test/integration/encryption_test.exs b/test/integration/encryption_test.exs index 9c9f342b..01741276 100644 --- a/test/integration/encryption_test.exs +++ b/test/integration/encryption_test.exs @@ -1,6 +1,6 @@ defmodule EncryptionTest do start_options = [ - nodes: ["127.0.0.1:9142"], + nodes: ["127.0.0.1:9152"], encryption: true, transport_options: [verify: :verify_none] ] diff --git a/test/integration/generic_test.exs b/test/integration/generic_test.exs index 6992fb1b..7aa64f0f 100644 --- a/test/integration/generic_test.exs +++ b/test/integration/generic_test.exs @@ -5,12 +5,18 @@ defmodule GenericTest do assert %Xandra.SetKeyspace{} = Xandra.run(conn, [], &Xandra.execute!(&1, "USE #{keyspace}")) end - test "DBConnection options in Xandra.start_link/1", %{conn: conn, keyspace: keyspace} do + test "DBConnection options in Xandra.start_link/1", %{ + conn: conn, + keyspace: keyspace, + start_options: start_options + } do Xandra.execute!(conn, "CREATE TABLE users (code int, name text, PRIMARY KEY (code, name))") Xandra.execute!(conn, "INSERT INTO users (code, name) VALUES (1, 'Meg')") assert {:ok, test_conn} = - start_supervised({Xandra, after_connect: &Xandra.execute(&1, "USE #{keyspace}")}) + start_supervised( + {Xandra, [after_connect: &Xandra.execute(&1, "USE #{keyspace}")] ++ start_options} + ) assert test_conn |> Xandra.execute!("SELECT * FROM users") |> Enum.to_list() == [ %{"code" => 1, "name" => "Meg"} @@ -21,8 +27,13 @@ defmodule GenericTest do # This test doesn't test much for now, it's sort of a smoke test. Once we'll have # generic telemetry events for queries, we can change this test to assert on the telemetry # event. - test ":default_consistency option when starting", %{keyspace: keyspace} do - assert {:ok, test_conn} = start_supervised({Xandra, default_consistency: :three}) + test ":default_consistency option when starting", %{ + keyspace: keyspace, + start_options: start_options + } do + assert {:ok, test_conn} = + start_supervised({Xandra, [default_consistency: :three] ++ start_options}) + Xandra.execute!(test_conn, "USE #{keyspace}") end end diff --git a/test/integration/protocol_negotiation_test.exs b/test/integration/protocol_negotiation_test.exs index 8a339539..e686ee93 100644 --- a/test/integration/protocol_negotiation_test.exs +++ b/test/integration/protocol_negotiation_test.exs @@ -10,7 +10,7 @@ defmodule ProtocolNegotiationTest do # downgrade to a lower version of the protocol # test "beta protocol v5" do - conn = start_supervised!({Xandra, show_sensitive_data_on_connection_error: true}) + conn = start_supervised!({Xandra, XandraTest.IntegrationCase.default_start_options()}) assert %Xandra.Page{} = Xandra.execute!(conn, "SELECT * FROM system.local") end end diff --git a/test/integration/telemetry_test.exs b/test/integration/telemetry_test.exs index 7a72735c..feb88f30 100644 --- a/test/integration/telemetry_test.exs +++ b/test/integration/telemetry_test.exs @@ -3,22 +3,27 @@ defmodule TelemetryTest do alias Xandra.Prepared + @port String.to_integer(System.get_env("CASSANDRA_PORT", "9052")) + setup_all %{setup_conn: conn, keyspace: keyspace} do Xandra.execute!(conn, "CREATE TABLE #{keyspace}.names (name text PRIMARY KEY)") :ok end describe "connection" do - test "sends event on connection/disconnection" do + test "sends event on connection/disconnection", %{start_options: start_options} do ref = :telemetry_test.attach_event_handlers(self(), [[:xandra, :connected]]) - start_supervised!({Xandra, [name: :telemetry_test_connection, pool_size: 1]}) + + start_supervised!( + {Xandra, [name: :telemetry_test_connection, pool_size: 1] ++ start_options} + ) assert_receive {[:xandra, :connected], ^ref, measurements, metadata} assert measurements == %{} assert metadata.connection_name == :telemetry_test_connection assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port end end @@ -37,7 +42,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port # Successive call to prepare uses cache. assert {:ok, ^prepared} = Xandra.prepare(conn, statement) @@ -47,7 +52,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert {:ok, ^prepared} = Xandra.prepare(conn, statement, force: true) @@ -56,7 +61,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port end test "prepare query", %{conn: conn} do @@ -75,7 +80,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert metadata.extra_metadata == %{foo: :bar} assert is_integer(system_time) @@ -84,7 +89,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert metadata.extra_metadata == %{foo: :bar} assert metadata.reprepared == false assert is_integer(duration) @@ -97,7 +102,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert metadata.extra_metadata == %{foo: :bar} assert metadata.reprepared == true assert is_integer(duration) @@ -121,7 +126,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert metadata.extra_metadata == %{foo: :bar} assert is_integer(system_time) @@ -130,7 +135,7 @@ defmodule TelemetryTest do assert metadata.query.statement == statement assert metadata.connection_name == nil assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == @port assert metadata.extra_metadata == %{foo: :bar} assert is_integer(duration) end diff --git a/test/integration/warning_test.exs b/test/integration/warning_test.exs index 64269326..fcbc36b8 100644 --- a/test/integration/warning_test.exs +++ b/test/integration/warning_test.exs @@ -51,7 +51,7 @@ defmodule WarningTest do assert %{warnings: [warning]} = measurements assert warning =~ "Unlogged batch covering 11 partitions" assert metadata.address == ~c"127.0.0.1" - assert metadata.port == 9042 + assert metadata.port == 9052 assert metadata.current_keyspace == keyspace assert inspect(metadata.query) == inspect(batch) end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index e54d84a0..e37d8a0d 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -36,7 +36,7 @@ defmodule XandraTest.IntegrationCase do @spec default_start_options() :: keyword() def default_start_options do - port = System.get_env("CASSANDRA_PORT", "9042") + port = System.get_env("CASSANDRA_PORT", "9052") options = [ show_sensitive_data_on_connection_error: true, diff --git a/test/xandra/cluster/ccm_test.exs b/test/xandra/cluster/ccm_test.exs index 30f2df71..f73ecb02 100644 --- a/test/xandra/cluster/ccm_test.exs +++ b/test/xandra/cluster/ccm_test.exs @@ -1,7 +1,7 @@ defmodule Xandra.Cluster.CCMTest do use ExUnit.Case - import Xandra.TestHelper, only: [cmd!: 2] + import Xandra.TestHelper, only: [cmd!: 2, wait_for_passing: 2] @moduletag :integration @moduletag :ccm @@ -21,14 +21,40 @@ defmodule Xandra.Cluster.CCMTest do end ccm("start") + ccm("status") + + on_exit(fn -> + ccm("stop") + end) + + Process.register(self(), :this_test_process) cluster = start_supervised!( - {Xandra.Cluster, nodes: ["127.0.0.1"], target_pools: 2, sync_connect: 1000} + {Xandra.Cluster, + nodes: ["127.0.0.1"], + target_pools: 2, + sync_connect: 5000, + registry_listeners: [:this_test_process]} ) + wait_for_passing(5000, fn -> + assert map_size(:sys.get_state(cluster).pools) == 2 + end) + cluster_state = :sys.get_state(cluster) - assert %{{{127, 0, 0, 1}, 9042} => _, {{127, 0, 0, 2}, 9042} => _} = cluster_state.pools + + assert map_size(cluster_state.pools) == 2 + + pool_addresses = + MapSet.new(cluster_state.pools, fn {{address, port}, _} -> + assert port == 9042 + address + end) + + assert [{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}] + |> MapSet.new() + |> MapSet.intersection(pool_addresses) == pool_addresses assert {{:connected, _connected_node}, ctrl_conn_state} = :sys.get_state(cluster_state.control_connection) @@ -39,13 +65,13 @@ defmodule Xandra.Cluster.CCMTest do {{127, 0, 0, 3}, 9042} => %{host: _host3, status: :up} } = ctrl_conn_state.peers - assert [ - {{{{127, 0, 0, 1}, 9042}, 1}, _pid1, :up}, - {{{{127, 0, 0, 2}, 9042}, 1}, _pid2, :up} - ] = - cluster_state.registry - |> Registry.select([{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2", :"$3"}}]}]) - |> Enum.sort() + assert_receive {:register, _registry, {{registry_addr1, 9042}, 1}, _pid1, :up} + assert_receive {:register, _registry, {{registry_addr2, 9042}, 1}, _pid2, :up} + + assert MapSet.subset?( + MapSet.new([registry_addr1, registry_addr2]), + MapSet.new([{127, 0, 0, 1}, {127, 0, 0, 2}, {127, 0, 0, 3}]) + ) end defp ccm(args) do @@ -53,15 +79,17 @@ defmodule Xandra.Cluster.CCMTest do end defp validate_ifaddresses do - {:ok, addresses} = :inet.getifaddrs() - assert {~c"lo0", info} = List.keyfind!(addresses, ~c"lo0", 0) + if :os.type() == {:unix, :darwin} do + {:ok, addresses} = :inet.getifaddrs() + assert {~c"lo0", info} = List.keyfind!(addresses, ~c"lo0", 0) - localhosts = for {:addr, {127, 0, 0, _} = addr} <- info, do: addr + localhosts = for {:addr, {127, 0, 0, _} = addr} <- info, do: addr - assert Enum.sort(localhosts) == [ - {127, 0, 0, 1}, - {127, 0, 0, 2}, - {127, 0, 0, 3} - ] + assert Enum.sort(localhosts) == [ + {127, 0, 0, 1}, + {127, 0, 0, 2}, + {127, 0, 0, 3} + ] + end end end diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index 3c8488f1..6730ba4c 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -15,6 +15,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do } @protocol_version XandraTest.IntegrationCase.protocol_version() + @port String.to_integer(System.get_env("CASSANDRA_PORT", "9052")) setup context do parent = self() @@ -28,11 +29,11 @@ defmodule Xandra.Cluster.ControlConnectionTest do start_options = [ cluster: mirror, refresh_topology_interval: 60_000, - autodiscovered_nodes_port: 9042, + autodiscovered_nodes_port: @port, connection_options: [protocol_version: @protocol_version], registry: registry, load_balancing: {LoadBalancingPolicy.DCAwareRoundRobin, []}, - contact_points: ["127.0.0.1"] + contact_points: ["127.0.0.1:#{@port}"] ] %{mirror_ref: mirror_ref, mirror: mirror, registry: registry, start_options: start_options} @@ -53,7 +54,10 @@ defmodule Xandra.Cluster.ControlConnectionTest do [:xandra, :cluster, :control_connection, :connected] ]) - start_control_connection!(start_options, contact_points: ["127.0.0.1:9039", "127.0.0.1"]) + start_control_connection!(start_options, + contact_points: ["127.0.0.1:9039", "127.0.0.1:#{@port}"] + ) + assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}} assert %Host{address: {127, 0, 0, 1}} = local_peer @@ -164,7 +168,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _} # No-op: sending a UP event for a node that is already up. - send_change_event(ctrl_conn, %StatusChange{effect: "UP", address: {127, 0, 0, 1}, port: 9042}) + send_change_event(ctrl_conn, %StatusChange{effect: "UP", address: {127, 0, 0, 1}, port: @port}) + refute_receive {:host_up, _host}, 100 assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, measurements, meta} @@ -177,12 +182,12 @@ defmodule Xandra.Cluster.ControlConnectionTest do send_change_event(ctrl_conn, %StatusChange{ effect: "DOWN", address: {127, 0, 0, 1}, - port: 9042 + port: @port }) assert_receive {^mirror_ref, {:host_down, %Host{} = host}} assert host.address == {127, 0, 0, 1} - assert host.port == 9042 + assert host.port == @port assert host.data_center == "datacenter1" assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, measurements, meta} @@ -195,7 +200,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do send_change_event(ctrl_conn, %StatusChange{ effect: "DOWN", address: {127, 0, 0, 1}, - port: 9042 + port: @port }) refute_receive {:host_down, _host}, 100 @@ -207,11 +212,11 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert %Host{address: {127, 0, 0, 1}} = meta.host # Getting StatusChange UP for the node brings it back up and notifies the cluster. - send_change_event(ctrl_conn, %StatusChange{effect: "UP", address: {127, 0, 0, 1}, port: 9042}) + send_change_event(ctrl_conn, %StatusChange{effect: "UP", address: {127, 0, 0, 1}, port: @port}) assert_receive {^mirror_ref, {:host_up, %Host{} = host}} assert host.address == {127, 0, 0, 1} - assert host.port == 9042 + assert host.port == @port assert host.data_center == "datacenter1" assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, measurements, meta} @@ -238,13 +243,13 @@ defmodule Xandra.Cluster.ControlConnectionTest do send_change_event(ctrl_conn, %TopologyChange{ effect: "NEW_NODE", address: {127, 0, 0, 101}, - port: 9042 + port: @port }) send_change_event(ctrl_conn, %TopologyChange{ effect: "REMOVED_NODE", address: {127, 0, 0, 102}, - port: 9042 + port: @port }) # Wait for the messages to be processed. @@ -263,7 +268,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do send_change_event(ctrl_conn, %TopologyChange{ effect: "MOVED_NODE", address: {127, 0, 0, 2}, - port: 9042 + port: @port }) Process.sleep(100) @@ -282,8 +287,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, _, _} new_peers = [ - %Host{address: {192, 168, 1, 1}, port: 9042, data_center: "datacenter1"}, - %Host{address: {192, 168, 1, 2}, port: 9042, data_center: "datacenter2"} + %Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"}, + %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"} ] :gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers}) @@ -321,8 +326,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do }} new_peers = [ - %Host{address: {192, 168, 1, 2}, port: 9042, data_center: "datacenter2"}, - %Host{address: {192, 168, 1, 3}, port: 9042, data_center: "datacenter3"} + %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"}, + %Host{address: {192, 168, 1, 3}, port: @port, data_center: "datacenter3"} ] :gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers}) @@ -349,8 +354,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do # Send the same list of peers and verify that we don't get any events. new_peers = [ - %Host{address: {192, 168, 1, 2}, port: 9042, data_center: "datacenter2"}, - %Host{address: {192, 168, 1, 3}, port: 9042, data_center: "datacenter3"} + %Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"}, + %Host{address: {192, 168, 1, 3}, port: @port, data_center: "datacenter3"} ] :gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers}) @@ -371,7 +376,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do for index <- 1..2 do {:ok, task_pid} = Task.start_link(fn -> - key = {{{127, 0, 0, 1}, 9042}, index} + key = {{{127, 0, 0, 1}, @port}, index} {:ok, _} = Registry.register(registry, key, :up) send(parent, {:ready, self()}) @@ -408,7 +413,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do }} end - test "performs healthcheck and sends node down message if not registered", + test "performs healthcheck", %{mirror_ref: mirror_ref, registry: registry, start_options: start_options} do telemetry_ref = :telemetry_test.attach_event_handlers(self(), [[:xandra, :cluster, :change_event]]) @@ -422,7 +427,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do {:ok, task_pid} = Task.start_link(fn -> - key = {{{127, 0, 0, 1}, 9042}, 1} + key = {{{127, 0, 0, 1}, @port}, 1} {:ok, _} = Registry.register(registry, key, :up) send(parent, {:ready, self()}) Process.sleep(:infinity) @@ -431,8 +436,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do assert_receive {:ready, ^task_pid} new_peers = [ - %Host{address: {127, 0, 0, 1}, port: 9042, data_center: "datacenter1"}, - %Host{address: {192, 168, 1, 1}, port: 9042, data_center: "datacenter1"} + %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"}, + %Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"} ] :gen_statem.cast(ctrl_conn, {:refresh_topology, new_peers}) @@ -449,24 +454,10 @@ defmodule Xandra.Cluster.ControlConnectionTest do send( ctrl_conn, - {:started_pool, %Host{address: {127, 0, 0, 1}, port: 9042, data_center: "datacenter1"}} - ) - - send( - ctrl_conn, - {:started_pool, %Host{address: {192, 168, 1, 1}, port: 9042, data_center: "datacenter1"}} + {:started_pool, %Host{address: {127, 0, 0, 1}, port: @port, data_center: "datacenter1"}} ) refute_receive {^mirror_ref, {:host_down, %Host{address: {127, 0, 0, 1}}}}, 600 - assert_receive {^mirror_ref, {:host_down, %Host{address: {192, 168, 1, 1}}}} - - assert_receive {[:xandra, :cluster, :change_event], ^telemetry_ref, %{}, - %{ - event_type: :host_down, - changed: true, - source: :xandra, - host: %Host{address: {192, 168, 1, 1}} - }} end defp start_control_connection!(start_options, overrides \\ []) do diff --git a/test/xandra_test.exs b/test/xandra_test.exs index 57a9a4ff..22ccceb3 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -1,6 +1,8 @@ defmodule XandraTest do use ExUnit.Case, async: true + import XandraTest.IntegrationCase, only: [default_start_options: 0] + doctest Xandra describe "options validation in Xandra.start_link/1" do @@ -46,12 +48,12 @@ defmodule XandraTest do end test "supports DBConnection.status/1 without raising" do - conn = start_supervised!(Xandra) + conn = start_supervised!({Xandra, default_start_options()}) assert DBConnection.status(conn) == :idle end test "raises for unsupported DBConnection callbacks" do - conn = start_supervised!(Xandra) + conn = start_supervised!({Xandra, default_start_options()}) assert_raise ArgumentError, "Cassandra doesn't support transactions", fn -> assert DBConnection.transaction(conn, fn _ -> :ok end) @@ -60,12 +62,16 @@ defmodule XandraTest do @tag :capture_log test "rescues DBConnection errors" do - conn = - start_supervised!( - {Xandra, - nodes: ["nonexistent-domain"], queue_target: 10, queue_interval: 10, pool_size: 0} + options = + Keyword.merge(default_start_options(), + nodes: ["nonexistent-domain"], + queue_target: 10, + queue_interval: 10, + pool_size: 0 ) + conn = start_supervised!({Xandra, options}) + assert {:error, %DBConnection.ConnectionError{}} = Xandra.execute(conn, "USE some_keyspace") end end