Skip to content

Commit

Permalink
Fix several bugs found in new server scanner code (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
vtnerd authored Oct 25, 2024
1 parent 7d6dc00 commit 97617f9
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 27 deletions.
60 changes: 53 additions & 7 deletions src/rpc/scanner/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "server.h"

#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <sodium/utils.h>
Expand Down Expand Up @@ -163,12 +164,16 @@ namespace lws { namespace rpc { namespace scanner

void operator()(const boost::system::error_code& error = {})
{
if (!self_ || error)
if (error)
{
if (error == boost::asio::error::operation_aborted)
return; // exiting
MONERO_THROW(error, "server acceptor failed");
}

if (!self_ || self_->stop_)
return;

assert(self_->strand_.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this)
{
Expand All @@ -192,7 +197,7 @@ namespace lws { namespace rpc { namespace scanner

void operator()(const boost::system::error_code& error = {}) const
{
if (!self_ || error == boost::asio::error::operation_aborted)
if (!self_ || self_->stop_ || error == boost::asio::error::operation_aborted)
return;

assert(self_->strand_.running_in_this_thread());
Expand Down Expand Up @@ -223,7 +228,7 @@ namespace lws { namespace rpc { namespace scanner
return;
}

auto reader = self_->disk_.start_read(std::move(self_->read_txn_));
auto reader = self_->disk_.start_read();
if (!reader)
{
if (reader.matches(std::errc::no_lock_available))
Expand All @@ -240,6 +245,8 @@ namespace lws { namespace rpc { namespace scanner
if (current_users.count() < self_->active_.size())
{
// a shrinking user base, re-shuffle
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand All @@ -254,6 +261,8 @@ namespace lws { namespace rpc { namespace scanner
new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value<db::account>())));
if (replace_threshold < new_accounts.size())
{
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand All @@ -268,6 +277,8 @@ namespace lws { namespace rpc { namespace scanner

if (!active_copy.empty())
{
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand Down Expand Up @@ -306,7 +317,7 @@ namespace lws { namespace rpc { namespace scanner

self_->next_thread_ %= total_threads;
}
self_->read_txn_ = reader->finish_read();
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
}
};
Expand Down Expand Up @@ -401,6 +412,28 @@ namespace lws { namespace rpc { namespace scanner
active_ = std::move(active);
}

void server::do_stop()
{
assert(strand_.running_in_this_thread());
if (stop_)
return;

MDEBUG("Stopping rpc::scanner::server async operations");
boost::system::error_code error{};
check_timer_.cancel(error);
acceptor_.cancel(error);
acceptor_.close(error);

for (auto& remote : remote_)
{
const auto conn = remote.lock();
if (conn)
boost::asio::dispatch(conn->strand_, [conn] () { conn->cleanup(); });
}

stop_ = true;
}

boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address)
{
std::string host;
Expand Down Expand Up @@ -432,12 +465,12 @@ namespace lws { namespace rpc { namespace scanner
active_(std::move(active)),
disk_(std::move(disk)),
zclient_(std::move(zclient)),
read_txn_{},
accounts_cur_{},
next_thread_(0),
pass_hashed_(),
pass_salt_(),
webhook_verify_(webhook_verify)
webhook_verify_(webhook_verify),
stop_(false)
{
std::sort(active_.begin(), active_.end());
for (const auto& local : local_)
Expand Down Expand Up @@ -488,6 +521,9 @@ namespace lws { namespace rpc { namespace scanner
{
self->acceptor_.close();
self->acceptor_.open(endpoint.protocol());
#if !defined(_WIN32)
self->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
#endif
self->acceptor_.bind(endpoint);
self->acceptor_.listen();

Expand Down Expand Up @@ -522,7 +558,17 @@ namespace lws { namespace rpc { namespace scanner
{
const lws::scanner_options opts{self->webhook_verify_, false, false};
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
GET_IO_SERVICE(self->check_timer_).stop();
{
self->do_stop();
self->strand_.context().stop();
}
});
}

void server::stop(const std::shared_ptr<server>& self)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
boost::asio::dispatch(self->strand_, [self] () { self->do_stop(); });
}
}}} // lws // rpc // scanner
10 changes: 8 additions & 2 deletions src/rpc/scanner/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ namespace lws { namespace rpc { namespace scanner
std::vector<db::account_id> active_;
db::storage disk_;
rpc::client zclient_;
lmdb::suspended_txn read_txn_;
db::cursor::accounts accounts_cur_;
std::size_t next_thread_;
std::array<unsigned char, 32> pass_hashed_;
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
const ssl_verification_t webhook_verify_;
bool stop_;

//! Async acceptor routine
class acceptor;
Expand All @@ -79,6 +79,9 @@ namespace lws { namespace rpc { namespace scanner
//! Reset `local_` and `remote_` scanners. Must be called in `strand_`.
void do_replace_users();

//! Stop all async operations
void do_stop();

public:
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);

Expand All @@ -105,6 +108,9 @@ namespace lws { namespace rpc { namespace scanner
static void replace_users(const std::shared_ptr<server>& self);

//! Update `users` information on local DB
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);

//! Stop a running instance of all operations
static void stop(const std::shared_ptr<server>& self);
};
}}} // lws // rpc // scanner
3 changes: 2 additions & 1 deletion src/rpc/scanner/write_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#pragma once

#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <memory>
Expand Down Expand Up @@ -167,7 +168,7 @@ namespace lws { namespace rpc { namespace scanner

if (msg.empty())
{
self->cleanup();
boost::asio::dispatch(self->strand_, [self] () { self->cleanup(); });
return;
}

Expand Down
44 changes: 27 additions & 17 deletions src/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "scanner.h"

#include <algorithm>
#include <boost/asio/use_future.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <boost/range/combine.hpp>
#include <boost/thread/condition_variable.hpp>
Expand Down Expand Up @@ -1044,23 +1045,27 @@ namespace lws
users.clear();
users.shrink_to_fit();

{
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);

rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
}
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));

// Blocks until sigint, local scanner issue, or exception
// Blocks until sigint, local scanner issue, storage issue, or exception
self.io_.run();
self.io_.restart();

// Make sure server stops because we could re-start after blockchain sync
rpc::scanner::server::stop(server);
self.io_.poll();
self.io_.restart();
}

template<typename R, typename Q>
Expand Down Expand Up @@ -1396,14 +1401,19 @@ namespace lws

boost::asio::steady_timer poll{sync_.io_};
poll.expires_from_now(rpc::scanner::account_poll_interval);
poll.async_wait([] (boost::system::error_code) {});
const auto ready = poll.async_wait(boost::asio::use_future);

sync_.io_.run_one();
/* The exchange rates timer could run while waiting, so ensure that
the correct timer was run. */
while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout)
{
sync_.io_.run_one();
sync_.io_.restart();
}
}
else
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);

sync_.io_.reset();
if (has_shutdown())
return;

Expand Down

0 comments on commit 97617f9

Please sign in to comment.