Skip to content

Commit

Permalink
Merge pull request #3868 from rbrw/pdb-5696-allow-connection-reuse-in…
Browse files Browse the repository at this point in the history
…-query-monitor

(PDB-5696) Allow connection reuse in query monitor
  • Loading branch information
austb authored Sep 7, 2023
2 parents ad2f557 + 9b986d9 commit a825e19
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 25 deletions.
87 changes: 63 additions & 24 deletions src/puppetlabs/puppetdb/query/monitor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
some cases customizably) time out.
Every monitored query will have a SelectionKey associated with it,
and that key should be canceled by the monitor code once the query
has been abandoned or has timed out.
The key is cancelled during forget, but won't be removed from the
selector's cancelled set until the next call to select. During that
time, another query on the same socket/connection could try to
re-register the cancelled key. This will throw an exception, which
we suppress and retry until the select loop finally removes the
cancelled key, and we can re-register the socket.
Every monitored query may also have a postgres pid associated with
it, and whenever it does, that pid should be terminated (in
corrdination with the :terminated promise) once the query has been
coordination with the :terminated promise) once the query has been
abandoned or has timed out.
The terminated promise and :forget value coordinate between the
Expand All @@ -56,7 +60,20 @@
The client socket monitoring depends on access to the jetty query
response which (at least at the moment) provides indirect access to
the java socket channel which can be read to determine whether the
client is still connected."
client is still connected.
The current implementation is completely incompatible with http
\"pipelining\", but it looks like that is no longer a realistic
concern:
https://daniel.haxx.se/blog/2019/04/06/curl-says-bye-bye-to-pipelining/
If that turns out to be an incorrect assumption, then we'll have to
reevaluate the implementation and/or feasibility of the monitoring.
That's because so far, the only way we've found to detect a client
disconnection is to attempt to read a byte. At the moment, that's
acceptable because the client shouldn't be sending any data during
the response (which of course wouldn't be true with pipelining,
where it could be sending additional requests)."

(:require
[clojure.tools.logging :as log]
Expand All @@ -72,9 +89,13 @@
(java.nio ByteBuffer)
(java.nio.channels CancelledKeyException
ClosedChannelException
ReadableByteChannel
SelectableChannel
SelectionKey
Selector)))
Selector
SocketChannel)))

(set! *warn-on-reflection* true)

(def ns-per-ms 1000000)

Expand Down Expand Up @@ -167,12 +188,15 @@
next-deadline
(do
(stop-query info "expired")
(.cancel select-key)
(.cancel ^SelectionKey select-key)
(recur))))))

(defn- describe-key [k]
(defn- describe-key [^SelectionKey k]
;; Currently only works for keys that have channels with
;; getRemoteAddress...
{:client (try
(-> k .channel .getRemoteAddress str)
(let [c ^SocketChannel (.channel k)]
(-> c .getRemoteAddress str))
(catch ClosedChannelException _ :closed))
:ops (if-let [ops (try (.readyOps k) (catch CancelledKeyException _))]
(set (for [[v n] [[SelectionKey/OP_ACCEPT :accept]
Expand All @@ -184,7 +208,7 @@
:cancelled)})

(defn- disconnected?
[chan buf]
[^ReadableByteChannel chan buf]
;; Various ssumptions here:
;; - transport (chan) will be non-blocking
;; - everyone, including jetty, etc. won't miss discarded bytes
Expand All @@ -202,7 +226,7 @@
puppetdb, we could consider treating data from the client as an
error."
[queries selected stop-query buf]
(doseq [select-key selected]
(doseq [^SelectionKey select-key selected]
(when (disconnected? (.channel select-key) buf)
(.cancel select-key)
(let [info (-> @queries :selector-keys (get select-key))]
Expand Down Expand Up @@ -230,7 +254,7 @@
:else (max 1 (int (/ (- deadline-ns (ephemeral-now-ns))
ns-per-ms)))))

(defn- monitor-queries [{:keys [exit queries selector] :as _monitor}
(defn- monitor-queries [{:keys [exit queries ^Selector selector] :as _monitor}
terminate-query
on-fatal-error]
;; We depend on the fact that any new queries will wake us
Expand All @@ -245,7 +269,7 @@
(when-not @exit
(if-let [next-deadline (enforce-deadlines! queries (ephemeral-now-ns)
terminate-query)]
(.select selector (deadline->select-timeout next-deadline))
(.select selector ^long (deadline->select-timeout next-deadline))
(.select selector))
(stop-abandoned! queries (.selectedKeys selector) terminate-query buf)
(recur)))
Expand Down Expand Up @@ -297,7 +321,7 @@
(Thread. #(monitor-queries m terminate-query on-fatal-error)
"pdb query monitor"))))

(defn start [{:keys [thread] :as monitor}]
(defn start [{:keys [^Thread thread] :as monitor}]
(assert (not (.isAlive thread)))
(.start thread)
monitor)
Expand All @@ -308,24 +332,38 @@
true otherwise. May be called more than once. When a true value is
returned, all monitor activities should be finished."
([monitor] (stop monitor nil))
([{:keys [exit selector thread] :as _monitor} timeout-ms]
([{:keys [exit ^Selector selector ^Thread thread] :as _monitor} timeout-ms]
(if-not (.isAlive thread)
true
(do
(reset! exit true)
(.wakeup selector)
;; Q: do we want this, and if so, have thread swallow it on the way out?
;; (.interrupt monitor-thread)
(if timeout-ms
(.join thread timeout-ms)
(.join thread))
(not (.isAlive thread))))))

(defn- register-selector
"Loops until the channel is registered with the selector while
ignoring canceled key exceptions, which should only occur when a
client is re-using the channel, and the previous query has called
forget (and cancelled the key), and the main select loop hasn't
removed it from the cancelled set yet. Returns the new
SelectionKey."
[channel selector ops]
(or (try
(.register ^SelectableChannel channel selector ops)
(catch CancelledKeyException _))
(do
(Thread/sleep 10)
(recur channel selector ops))))

(defn stop-query-at-deadline-or-disconnect
[{:keys [selector queries thread] :as _monitor} id channel deadline-ns db]
[{:keys [^Selector selector queries ^Thread thread] :as _monitor}
id ^SelectableChannel channel deadline-ns db]
(assert (.isAlive thread))
(assert (instance? SelectableChannel channel))
(let [select-key (.register channel selector SelectionKey/OP_READ)
(let [select-key (register-selector channel selector SelectionKey/OP_READ)
info {:query-id id
:selection-key select-key
:deadline-ns deadline-ns
Expand All @@ -344,11 +382,11 @@
(-> prev
(update :selector-keys assoc select-key info)
(update :deadlines conj [[deadline-ns select-key] info]))))
(.wakeup selector) ;; so it can recompute deadline
(.wakeup selector) ;; to recompute deadline and start watching select-key
select-key))

(defn register-pg-pid
[{:keys [queries thread] :as _monitor} select-key pid]
[{:keys [queries ^Thread thread] :as _monitor} select-key pid]
(assert (.isAlive thread))
(let [{:keys [selector-keys]} @queries
{:keys [pg-pid] :as _info} (selector-keys select-key)]
Expand All @@ -357,7 +395,7 @@
(swap! pg-pid (fn [prev] (assert (not prev)) pid)))))

(defn forget-pg-pid [{:keys [queries thread] :as _monitor} select-key]
(assert (.isAlive thread))
(assert (.isAlive ^Thread thread))
(let [{:keys [selector-keys]} @queries
{:keys [pg-pid] :as _info} (selector-keys select-key)]
;; Whole entry might not exist if the client has disconnected.
Expand All @@ -372,10 +410,9 @@
forgotten about it, but the final disposition of that query is
undefined, i.e. it might or might not have been killed
successfully."
[{:keys [queries thread] :as _monitor} select-key]
[{:keys [queries ^Selector selector ^Thread thread] :as _monitor}
^SelectionKey select-key]
(assert (.isAlive thread))
;; After this some key methods will throw CancelledKeyException
(.cancel select-key)
(let [maybe-await-termination (atom nil)]
(swap! queries
(fn [{:keys [selector-keys] :as state}]
Expand All @@ -387,6 +424,8 @@
(update :selector-keys assoc select-key info)
(update :deadlines assoc [deadline-ns select-key] info)))
state)))
(.cancel select-key)
(.wakeup selector) ;; clear out the cancelled keys (see stop-query-at-...)
(if (= ::timeout (some-> @maybe-await-termination (deref 2000 ::timeout)))
::timeout
true)))
2 changes: 1 addition & 1 deletion src/puppetlabs/puppetdb/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
(try
(~initiate-shutdown ex#)
(catch Throwable ex2#
(.addSuppressed ex# ex2#)))
(.addSuppressed ^Throwable ex# ex2#)))
(throw ex#))
~@body))

Expand Down
30 changes: 30 additions & 0 deletions test/puppetlabs/puppetdb/query/monitor_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns puppetlabs.puppetdb.query.monitor-test
(:require
[clj-http.client :as http]
[clojure.java.shell :refer [sh]]
[clojure.string :as str]
[clojure.test :refer :all]
[murphy :refer [try!]]
Expand Down Expand Up @@ -280,3 +281,32 @@
(update :info dissoc :query-id)
(update-in [:info :pg-pid] int?))
summary)))))))

(deftest connection-reuse
;; Test multiple queries over the same connection. Run the test
;; multiple times because the first problem we encountered could
;; only occur during a narrow window in the monitor loop if a new
;; request came in between select invocations, after the key had
;; been cancelled.
;; https://github.com/puppetlabs/puppetdb/issues/3866
(with-puppetdb nil
(jdbc/with-db-transaction [] (add-certnames certnames))
;; Just use curl, since it's trivally easy to get it to do what we
;; need, and we already require it for test setup (via ext/). (It
;; looks like both clj-http and the JDK HttpClient have more
;; indirect control over reuse.)
(let [nodes (-> (assoc *base-url* :prefix "/pdb/query/v4/nodes")
base-url->str-with-prefix)
cmd ["curl" "--no-progress-meter" "--show-error" "--fail-early"
"--fail" nodes "-o" "/dev/null"
"--next" "--fail" nodes "-o" "/dev/null"
"--next" "--fail" nodes "-o" "/dev/null"]]
(loop [i 0]
(let [{:keys [exit out err]} (apply sh cmd)]
(when (< i 10)
(if (is (= 0 exit))
(recur (inc i))
(do
(apply println "Failed:" cmd)
(print out)
(print err)))))))))

0 comments on commit a825e19

Please sign in to comment.