From 579aeb2d37188c8e4999a92f155d8235621a22f1 Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Tue, 5 Sep 2023 17:44:19 -0500 Subject: [PATCH 1/3] (PDB-5696) query.monitor: eliminate all reflection --- src/puppetlabs/puppetdb/query/monitor.clj | 37 ++++++++++++++--------- src/puppetlabs/puppetdb/utils.clj | 2 +- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/puppetlabs/puppetdb/query/monitor.clj b/src/puppetlabs/puppetdb/query/monitor.clj index 61db66b8f8..96209f2887 100644 --- a/src/puppetlabs/puppetdb/query/monitor.clj +++ b/src/puppetlabs/puppetdb/query/monitor.clj @@ -72,9 +72,13 @@ (java.nio ByteBuffer) (java.nio.channels CancelledKeyException ClosedChannelException + ReadableByteChannel SelectableChannel SelectionKey - Selector))) + Selector + SocketChannel))) + +(set! *warn-on-reflection* true) (def ns-per-ms 1000000) @@ -167,12 +171,15 @@ next-deadline (do (stop-query info "expired") - (.cancel select-key) + (.cancel ^SelectionKey select-key) (recur)))))) -(defn- describe-key [k] +(defn- describe-key [^SelectionKey k] + ;; Currently only works for keys that have channels with + ;; getRemoteAddress... {:client (try - (-> k .channel .getRemoteAddress str) + (let [c ^SocketChannel (.channel k)] + (-> c .getRemoteAddress str)) (catch ClosedChannelException _ :closed)) :ops (if-let [ops (try (.readyOps k) (catch CancelledKeyException _))] (set (for [[v n] [[SelectionKey/OP_ACCEPT :accept] @@ -184,7 +191,7 @@ :cancelled)}) (defn- disconnected? - [chan buf] + [^ReadableByteChannel chan buf] ;; Various ssumptions here: ;; - transport (chan) will be non-blocking ;; - everyone, including jetty, etc. won't miss discarded bytes @@ -202,7 +209,7 @@ puppetdb, we could consider treating data from the client as an error." [queries selected stop-query buf] - (doseq [select-key selected] + (doseq [^SelectionKey select-key selected] (when (disconnected? (.channel select-key) buf) (.cancel select-key) (let [info (-> @queries :selector-keys (get select-key))] @@ -230,7 +237,7 @@ :else (max 1 (int (/ (- deadline-ns (ephemeral-now-ns)) ns-per-ms))))) -(defn- monitor-queries [{:keys [exit queries selector] :as _monitor} +(defn- monitor-queries [{:keys [exit queries ^Selector selector] :as _monitor} terminate-query on-fatal-error] ;; We depend on the fact that any new queries will wake us @@ -245,7 +252,7 @@ (when-not @exit (if-let [next-deadline (enforce-deadlines! queries (ephemeral-now-ns) terminate-query)] - (.select selector (deadline->select-timeout next-deadline)) + (.select selector ^long (deadline->select-timeout next-deadline)) (.select selector)) (stop-abandoned! queries (.selectedKeys selector) terminate-query buf) (recur))) @@ -297,7 +304,7 @@ (Thread. #(monitor-queries m terminate-query on-fatal-error) "pdb query monitor")))) -(defn start [{:keys [thread] :as monitor}] +(defn start [{:keys [^Thread thread] :as monitor}] (assert (not (.isAlive thread))) (.start thread) monitor) @@ -308,7 +315,7 @@ true otherwise. May be called more than once. When a true value is returned, all monitor activities should be finished." ([monitor] (stop monitor nil)) - ([{:keys [exit selector thread] :as _monitor} timeout-ms] + ([{:keys [exit ^Selector selector ^Thread thread] :as _monitor} timeout-ms] (if-not (.isAlive thread) true (do @@ -322,7 +329,8 @@ (not (.isAlive thread)))))) (defn stop-query-at-deadline-or-disconnect - [{:keys [selector queries thread] :as _monitor} id channel deadline-ns db] + [{:keys [^Selector selector queries ^Thread thread] :as _monitor} + id ^SelectableChannel channel deadline-ns db] (assert (.isAlive thread)) (assert (instance? SelectableChannel channel)) (let [select-key (.register channel selector SelectionKey/OP_READ) @@ -348,7 +356,7 @@ select-key)) (defn register-pg-pid - [{:keys [queries thread] :as _monitor} select-key pid] + [{:keys [queries ^Thread thread] :as _monitor} select-key pid] (assert (.isAlive thread)) (let [{:keys [selector-keys]} @queries {:keys [pg-pid] :as _info} (selector-keys select-key)] @@ -357,7 +365,7 @@ (swap! pg-pid (fn [prev] (assert (not prev)) pid))))) (defn forget-pg-pid [{:keys [queries thread] :as _monitor} select-key] - (assert (.isAlive thread)) + (assert (.isAlive ^Thread thread)) (let [{:keys [selector-keys]} @queries {:keys [pg-pid] :as _info} (selector-keys select-key)] ;; Whole entry might not exist if the client has disconnected. @@ -372,7 +380,8 @@ forgotten about it, but the final disposition of that query is undefined, i.e. it might or might not have been killed successfully." - [{:keys [queries thread] :as _monitor} select-key] + [{:keys [queries ^Selector selector ^Thread thread] :as _monitor} + ^SelectionKey select-key] (assert (.isAlive thread)) ;; After this some key methods will throw CancelledKeyException (.cancel select-key) diff --git a/src/puppetlabs/puppetdb/utils.clj b/src/puppetlabs/puppetdb/utils.clj index 72642f3332..4ba8f49b44 100644 --- a/src/puppetlabs/puppetdb/utils.clj +++ b/src/puppetlabs/puppetdb/utils.clj @@ -423,7 +423,7 @@ (try (~initiate-shutdown ex#) (catch Throwable ex2# - (.addSuppressed ex# ex2#))) + (.addSuppressed ^Throwable ex# ex2#))) (throw ex#)) ~@body)) From 173dd07804ec835f3ab042cb715d4bc1d00fe4aa Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Tue, 5 Sep 2023 17:48:09 -0500 Subject: [PATCH 2/3] (PDB-5696) query.monitor/stop: elaborate on interrupt considerations --- src/puppetlabs/puppetdb/query/monitor.clj | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/puppetlabs/puppetdb/query/monitor.clj b/src/puppetlabs/puppetdb/query/monitor.clj index 96209f2887..4dc7168359 100644 --- a/src/puppetlabs/puppetdb/query/monitor.clj +++ b/src/puppetlabs/puppetdb/query/monitor.clj @@ -321,7 +321,10 @@ (do (reset! exit true) (.wakeup selector) - ;; Q: do we want this, and if so, have thread swallow it on the way out? + ;; Q: do we want this, and if so, have thread swallow it on the + ;; way out? Before deciding, see the SelectableChannel docs. + ;; Thread interrupts cause the key for any channel the thread + ;; was blocked on to be canceled. ;; (.interrupt monitor-thread) (if timeout-ms (.join thread timeout-ms) From 9b986d961e825106241be4390bda1e9eccb95efc Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Thu, 7 Sep 2023 14:52:27 -0500 Subject: [PATCH 3/3] (PDB-5696) query.monitor: accommodate query socket reuse Previously, any query a client issued after the first on a given connection might provoke a CancelledKeyException when it tried to register the query/socket/channel with the monitor because the selection key for a given channel for a given selector is unique, and a cancelled key is only cleaned up during the next call to select for the relevant selector. Any attempt to register a channel with a selector between the cancellation of the previous key for that channel and the clean up in the next select will provoke the CancelledKeyException. To address the issue, adjust the registration code to retry whenever it encounters that exception until it succeeds. While it's retrying, the monitor select loop, which will have been woken up by the previous query's forget, will issue a new select call, removing the canceled key. Fixes: https://github.com/puppetlabs/puppetdb/issues/3866 --- src/puppetlabs/puppetdb/query/monitor.clj | 53 ++++++++++++++----- .../puppetdb/query/monitor_test.clj | 30 +++++++++++ 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/puppetlabs/puppetdb/query/monitor.clj b/src/puppetlabs/puppetdb/query/monitor.clj index 4dc7168359..d2f22c7ef1 100644 --- a/src/puppetlabs/puppetdb/query/monitor.clj +++ b/src/puppetlabs/puppetdb/query/monitor.clj @@ -39,12 +39,16 @@ some cases customizably) time out. Every monitored query will have a SelectionKey associated with it, - and that key should be canceled by the monitor code once the query - has been abandoned or has timed out. + The key is cancelled during forget, but won't be removed from the + selector's cancelled set until the next call to select. During that + time, another query on the same socket/connection could try to + re-register the cancelled key. This will throw an exception, which + we suppress and retry until the select loop finally removes the + cancelled key, and we can re-register the socket. Every monitored query may also have a postgres pid associated with it, and whenever it does, that pid should be terminated (in - corrdination with the :terminated promise) once the query has been + coordination with the :terminated promise) once the query has been abandoned or has timed out. The terminated promise and :forget value coordinate between the @@ -56,7 +60,20 @@ The client socket monitoring depends on access to the jetty query response which (at least at the moment) provides indirect access to the java socket channel which can be read to determine whether the - client is still connected." + client is still connected. + + The current implementation is completely incompatible with http + \"pipelining\", but it looks like that is no longer a realistic + concern: + https://daniel.haxx.se/blog/2019/04/06/curl-says-bye-bye-to-pipelining/ + + If that turns out to be an incorrect assumption, then we'll have to + reevaluate the implementation and/or feasibility of the monitoring. + That's because so far, the only way we've found to detect a client + disconnection is to attempt to read a byte. At the moment, that's + acceptable because the client shouldn't be sending any data during + the response (which of course wouldn't be true with pipelining, + where it could be sending additional requests)." (:require [clojure.tools.logging :as log] @@ -321,22 +338,32 @@ (do (reset! exit true) (.wakeup selector) - ;; Q: do we want this, and if so, have thread swallow it on the - ;; way out? Before deciding, see the SelectableChannel docs. - ;; Thread interrupts cause the key for any channel the thread - ;; was blocked on to be canceled. - ;; (.interrupt monitor-thread) (if timeout-ms (.join thread timeout-ms) (.join thread)) (not (.isAlive thread)))))) +(defn- register-selector + "Loops until the channel is registered with the selector while + ignoring canceled key exceptions, which should only occur when a + client is re-using the channel, and the previous query has called + forget (and cancelled the key), and the main select loop hasn't + removed it from the cancelled set yet. Returns the new + SelectionKey." + [channel selector ops] + (or (try + (.register ^SelectableChannel channel selector ops) + (catch CancelledKeyException _)) + (do + (Thread/sleep 10) + (recur channel selector ops)))) + (defn stop-query-at-deadline-or-disconnect [{:keys [^Selector selector queries ^Thread thread] :as _monitor} id ^SelectableChannel channel deadline-ns db] (assert (.isAlive thread)) (assert (instance? SelectableChannel channel)) - (let [select-key (.register channel selector SelectionKey/OP_READ) + (let [select-key (register-selector channel selector SelectionKey/OP_READ) info {:query-id id :selection-key select-key :deadline-ns deadline-ns @@ -355,7 +382,7 @@ (-> prev (update :selector-keys assoc select-key info) (update :deadlines conj [[deadline-ns select-key] info])))) - (.wakeup selector) ;; so it can recompute deadline + (.wakeup selector) ;; to recompute deadline and start watching select-key select-key)) (defn register-pg-pid @@ -386,8 +413,6 @@ [{:keys [queries ^Selector selector ^Thread thread] :as _monitor} ^SelectionKey select-key] (assert (.isAlive thread)) - ;; After this some key methods will throw CancelledKeyException - (.cancel select-key) (let [maybe-await-termination (atom nil)] (swap! queries (fn [{:keys [selector-keys] :as state}] @@ -399,6 +424,8 @@ (update :selector-keys assoc select-key info) (update :deadlines assoc [deadline-ns select-key] info))) state))) + (.cancel select-key) + (.wakeup selector) ;; clear out the cancelled keys (see stop-query-at-...) (if (= ::timeout (some-> @maybe-await-termination (deref 2000 ::timeout))) ::timeout true))) diff --git a/test/puppetlabs/puppetdb/query/monitor_test.clj b/test/puppetlabs/puppetdb/query/monitor_test.clj index c91a4842a8..d06158f8d1 100644 --- a/test/puppetlabs/puppetdb/query/monitor_test.clj +++ b/test/puppetlabs/puppetdb/query/monitor_test.clj @@ -1,6 +1,7 @@ (ns puppetlabs.puppetdb.query.monitor-test (:require [clj-http.client :as http] + [clojure.java.shell :refer [sh]] [clojure.string :as str] [clojure.test :refer :all] [murphy :refer [try!]] @@ -280,3 +281,32 @@ (update :info dissoc :query-id) (update-in [:info :pg-pid] int?)) summary))))))) + +(deftest connection-reuse + ;; Test multiple queries over the same connection. Run the test + ;; multiple times because the first problem we encountered could + ;; only occur during a narrow window in the monitor loop if a new + ;; request came in between select invocations, after the key had + ;; been cancelled. + ;; https://github.com/puppetlabs/puppetdb/issues/3866 + (with-puppetdb nil + (jdbc/with-db-transaction [] (add-certnames certnames)) + ;; Just use curl, since it's trivally easy to get it to do what we + ;; need, and we already require it for test setup (via ext/). (It + ;; looks like both clj-http and the JDK HttpClient have more + ;; indirect control over reuse.) + (let [nodes (-> (assoc *base-url* :prefix "/pdb/query/v4/nodes") + base-url->str-with-prefix) + cmd ["curl" "--no-progress-meter" "--show-error" "--fail-early" + "--fail" nodes "-o" "/dev/null" + "--next" "--fail" nodes "-o" "/dev/null" + "--next" "--fail" nodes "-o" "/dev/null"]] + (loop [i 0] + (let [{:keys [exit out err]} (apply sh cmd)] + (when (< i 10) + (if (is (= 0 exit)) + (recur (inc i)) + (do + (apply println "Failed:" cmd) + (print out) + (print err)))))))))