Skip to content

Commit

Permalink
Merge pull request #177 from kornelski/pool
Browse files Browse the repository at this point in the history
Simplify server pool
  • Loading branch information
lipanski authored Sep 15, 2023
2 parents 99735e7 + 6b4073e commit be00de4
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 68 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
components: clippy, rustfmt
override: true
Expand All @@ -29,7 +29,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
components: clippy, rustfmt
override: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
override: true
- name: Check
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["mock", "mocks", "http", "webmock", "webmocks"]
categories = ["development-tools::testing", "web-programming"]
exclude = ["/.appveyor.yml", "/.travis.yml", "/benchmarks.txt", "/docs/", "/slides.pdf"]
edition = "2021"
rust-version = "1.68"

[badges]
travis-ci = { repository = "lipanski/mockito", branch = "master" }
Expand All @@ -21,7 +22,6 @@ assert-json-diff = "2.0"
colored = { version = "2.0", optional = true }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
log = "0.4"
rand = "0.8"
regex = "1.7"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<p align="center">
<a href="https://docs.rs/mockito"><img src="https://docs.rs/mockito/badge.svg"></a>
<a href="https://crates.io/crates/mockito"><img src="https://img.shields.io/crates/v/mockito.svg"></a>
<img src="https://img.shields.io/badge/rust%20version-%3E%3D1.65.0-orange">
<img src="https://img.shields.io/badge/rust%20version-%3E%3D1.68.0-orange">
<a href="https://crates.io/crates/mockito"><img src="https://img.shields.io/crates/d/mockito"></a>
<a href="https://github.com/lipanski/mockito/actions/workflows/tests.yml/?branch=master"><img src="https://github.com/lipanski/mockito/actions/workflows/tests.yml/badge.svg?branch=master"></a>
</p>
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn test_simple_route_mock_async() {

## Minimum supported Rust toolchain

The current minimum support Rust toolchain is **1.65.0**
The current minimum support Rust toolchain is **1.68.0**

## Contribution Guidelines

Expand All @@ -144,7 +144,7 @@ cargo test
...or run tests using a different toolchain:

```sh
rustup run --install 1.65.0 cargo test
rustup run --install 1.68.0 cargo test
```

...or run tests while disabling the default features (e.g. the colors):
Expand Down Expand Up @@ -184,7 +184,7 @@ rustup component add clippy
The linter is always run on the minimum supported Rust version:

```sh
rustup run --install 1.65.0 cargo clippy-mockito
rustup run --install 1.68.0 cargo clippy-mockito
```

### Release
Expand Down
68 changes: 18 additions & 50 deletions src/server_pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::Server;
use crate::{Error, ErrorKind};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut, Drop};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio::sync::{Semaphore, SemaphorePermit};

const DEFAULT_POOL_SIZE: usize = 50;

lazy_static! {
pub(crate) static ref SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);
}
// macOS has small default ulimits. Sync it with test_server_pool()
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };
pub(crate) static SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);

///
/// A handle around a pooled `Server` object which dereferences to `Server`.
Expand Down Expand Up @@ -46,75 +43,46 @@ impl DerefMut for ServerGuard {
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(server) = self.server.take() {
// the permit is still held when recycling,
// so the next acquire will already see the recycled server
SERVER_POOL.recycle(server);
}
}
}

pub(crate) struct ServerPool {
max_size: usize,
created: Arc<Mutex<usize>>,
semaphore: Semaphore,
state: Arc<Mutex<VecDeque<Server>>>,
free_list: Mutex<VecDeque<Server>>,
}

impl ServerPool {
fn new(max_size: usize) -> ServerPool {
let created = Arc::new(Mutex::new(0));
let semaphore = Semaphore::new(max_size);
let state = Arc::new(Mutex::new(VecDeque::new()));
const fn new(max_size: usize) -> ServerPool {
ServerPool {
max_size,
created,
semaphore,
state,
semaphore: Semaphore::const_new(max_size),
free_list: Mutex::new(VecDeque::new()),
}
}

pub(crate) async fn get_async(&'static self) -> Result<ServerGuard, Error> {
// number of active permits limits the number of servers created
let permit = self
.semaphore
.acquire()
.await
.map_err(|err| Error::new_with_context(ErrorKind::Deadlock, err))?;

let should_create = {
let created_mutex = self.created.clone();
let mut created = created_mutex.lock().unwrap();
if *created < self.max_size {
*created += 1;
true
} else {
false
}
};

let server = {
if should_create {
Some(Server::try_new_with_port_async(0).await?)
} else {
None
}
// be careful not to lock locks in match - it extends scope of temporaries
let recycled = self.free_list.lock().unwrap().pop_front();
let server = match recycled {
Some(server) => server,
None => Server::try_new_with_port_async(0).await?,
};

let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();

if let Some(server) = server {
state.push_back(server);
}

if let Some(server) = state.pop_front() {
Ok(ServerGuard::new(server, permit))
} else {
Err(Error::new(ErrorKind::ServerBusy))
}
Ok(ServerGuard::new(server, permit))
}

fn recycle(&self, mut server: Server) {
server.reset();
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
state.push_back(server);
self.free_list.lock().unwrap().push_back(server);
}
}
33 changes: 23 additions & 10 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1915,35 +1915,45 @@ fn test_running_multiple_servers() {
assert_eq!("s3", body3);
}

static SERIAL_POOL_TESTS: Mutex<()> = Mutex::new(());
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };

#[test]
#[allow(clippy::vec_init_then_push)]
fn test_server_pool() {
// two tests can't monopolize the pool at the same time
let _lock = SERIAL_POOL_TESTS.lock().unwrap();

// If the pool is not working, this will hit the file descriptor limit (Too many open files)
for _ in 0..20 {
// The pool size is 50, anything beyond that will block
for _ in 0..50 {
let mut servers = vec![];
let mut servers = vec![];
// Anything beyond pool size will block.
for _ in 0..DEFAULT_POOL_SIZE {
servers.push(Server::new());

let s = servers.first_mut().unwrap();
let s = servers.last_mut().unwrap();
let m = s.mock("GET", "/pool").create();
let (_, _, _) = request_with_body(&s.host_with_port(), "GET /pool", "", "");
m.assert();
}
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[allow(clippy::vec_init_then_push)]
async fn test_server_pool_async() {
// two tests can't monopolize the pool at the same time
tokio::task::yield_now().await;
let _lock = tokio::task::block_in_place(|| SERIAL_POOL_TESTS.lock().unwrap());

// If the pool is not working, this will hit the file descriptor limit (Too many open files)
for _ in 0..20 {
// The pool size is 50, anything beyond that will block
for _ in 0..50 {
let mut servers = vec![];
let mut servers = vec![];
// Anything beyond pool size will block
for _ in 0..DEFAULT_POOL_SIZE {
servers.push(Server::new_async().await);

let s = servers.first_mut().unwrap();
let s = servers.last_mut().unwrap();
let m = s.mock("GET", "/pool").create_async().await;
let (_, _, _) = request_with_body(&s.host_with_port(), "GET /pool", "", "");
m.assert_async().await;
Expand Down Expand Up @@ -2052,8 +2062,11 @@ async fn test_match_body_asnyc() {
assert_eq!(200, response.status());
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_join_all_async() {
tokio::task::yield_now().await;
let _lock = tokio::task::block_in_place(|| SERIAL_POOL_TESTS.lock().unwrap());

let futures = (0..10).map(|_| async {
let mut s = Server::new_async().await;
let m = s.mock("POST", "/").create_async().await;
Expand Down

0 comments on commit be00de4

Please sign in to comment.