Skip to content

Commit

Permalink
(PDB-5696) query.monitor: accommodate query socket reuse
Browse files Browse the repository at this point in the history
Previously, any query a client issued after the first on a given
connection might provoke a CancelledKeyException when it tried to
register the query/socket/channel with the monitor because the
selection key for a given channel for a given selector is unique, and
a cancelled key is only cleaned up during the next call to select for
the relevant selector.

Any attempt to register a channel with a selector between the
cancellation of the previous key for that channel and the clean up in
the next select will provoke the CancelledKeyException.

To address the issue, adjust the registration code to retry whenever
it encounters that exception, until it succeeds.  While it's retrying,
the monitor select loop, which will have been woken up by the previous
query's forget, will issue a new select call, removing the canceled
key.

Fixes: #3866
  • Loading branch information
rbrw committed Sep 5, 2023
1 parent 173dd07 commit 51d3eb9
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions src/puppetlabs/puppetdb/query/monitor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
some cases customizably) time out.
Every monitored query will have a SelectionKey associated with it,
and that key should be canceled by the monitor code once the query
has been abandoned or has timed out.
The key is cancelled during forget, but won't be removed from the
selector's cancelled set until the next call to select. During that
time, another query on the same socket/connection could try to
re-register the cancelled key. This will throw an exception, which
we suppress and retry until the select loop finally removes the
cancelled key, and we can re-register the socket.
Every monitored query may also have a postgres pid associated with
it, and whenever it does, that pid should be terminated (in
corrdination with the :terminated promise) once the query has been
coordination with the :terminated promise) once the query has been
abandoned or has timed out.
The terminated promise and :forget value coordinate between the
Expand All @@ -56,7 +60,20 @@
The client socket monitoring depends on access to the jetty query
response which (at least at the moment) provides indirect access to
the java socket channel which can be read to determine whether the
client is still connected."
client is still connected.
The current implementation is completely incompatible with http
\"pipelining\", but it looks like that is no longer a realistic
concern:
https://daniel.haxx.se/blog/2019/04/06/curl-says-bye-bye-to-pipelining/
If that turns out to be an incorrect assumption, then we'll have to
reevaluate the implementation and/or feasibility of the monitoring.
That's because so far, the only way we've found to detect a client
disconnection is to attempt to read a byte. At the moment, that's
acceptable because the client shouldn't be sending any data during
the response (which of course wouldn't be true with pipelining,
where it could be sending additional requests)."

(:require
[clojure.tools.logging :as log]
Expand Down Expand Up @@ -331,12 +348,28 @@
(.join thread))
(not (.isAlive thread))))))

(defn- register-selector [channel selector ops]
"Loops until the channel is registered with the selector while
ignoring canceled key exceptions, which should only occur when a
client is re-using the channel, and the previous query has called
forget (and cancelled the key), and the main select loop hasn't
removed it from the cancelled set yet. Returns the new
SelectionKey."
(or (try
(.register ^SelectableChannel channel selector ops)
(catch CancelledKeyException _))
(do
;; The competing forget should have already called wakeup.
;; REVIEW: 10?
(Thread/sleep 10)
(recur channel selector ops))))

(defn stop-query-at-deadline-or-disconnect
[{:keys [^Selector selector queries ^Thread thread] :as _monitor}
id ^SelectableChannel channel deadline-ns db]
(assert (.isAlive thread))
(assert (instance? SelectableChannel channel))
(let [select-key (.register channel selector SelectionKey/OP_READ)
(let [select-key (register-selector channel selector SelectionKey/OP_READ)
info {:query-id id
:selection-key select-key
:deadline-ns deadline-ns
Expand All @@ -355,7 +388,7 @@
(-> prev
(update :selector-keys assoc select-key info)
(update :deadlines conj [[deadline-ns select-key] info]))))
(.wakeup selector) ;; so it can recompute deadline
(.wakeup selector) ;; to recompute deadline and start watching select-key
select-key))

(defn register-pg-pid
Expand Down Expand Up @@ -386,8 +419,6 @@
[{:keys [queries ^Selector selector ^Thread thread] :as _monitor}
^SelectionKey select-key]
(assert (.isAlive thread))
;; After this some key methods will throw CancelledKeyException
(.cancel select-key)
(let [maybe-await-termination (atom nil)]
(swap! queries
(fn [{:keys [selector-keys] :as state}]
Expand All @@ -399,6 +430,8 @@
(update :selector-keys assoc select-key info)
(update :deadlines assoc [deadline-ns select-key] info)))
state)))
(.cancel select-key)
(.wakeup selector) ;; clear out the cancelled keys (see stop-query-at-...)
(if (= ::timeout (some-> @maybe-await-termination (deref 2000 ::timeout)))
::timeout
true)))

0 comments on commit 51d3eb9

Please sign in to comment.