Skip to content

Commit

Permalink
End-to-end multi-replica case.
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitbhrdwj committed Dec 5, 2023
1 parent 174b25c commit 1e62b1c
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 38 deletions.
4 changes: 2 additions & 2 deletions kernel/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) type Eid = usize;
pub(crate) const MAX_PROCESSES: usize = 12;

/// How many registered "named" frames a process can have.
pub(crate) const MAX_FRAMES_PER_PROCESS: usize = 1023;
pub(crate) const MAX_FRAMES_PER_PROCESS: usize = 2047;

/// How many writable sections a process can have (part of the ELF file).
pub(crate) const MAX_WRITEABLE_SECTIONS_PER_PROCESS: usize = 4;
Expand Down Expand Up @@ -175,7 +175,7 @@ impl FrameManagement for ProcessFrames {
// If `self.frames` is too big, the O(n) lookup in this fn might become
// a problem. better to implement some reverse-map for PAddr -> FrameId
// then.
static_assertions::const_assert!(MAX_FRAMES_PER_PROCESS < 1024);
static_assertions::const_assert!(MAX_FRAMES_PER_PROCESS < 2048);

for (frame, ref mut refcnt) in self.frames.iter_mut() {
if let Some(frame) = frame {
Expand Down
43 changes: 40 additions & 3 deletions kernel/tests/s11_rackscale_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,49 @@ fn s11_rackscale_dynrep_userspace() {
fn controller_match_fn(
proc: &mut PtySession,
output: &mut String,
_cores_per_client: usize,
_num_clients: usize,
_file_name: &str,
cores_per_client: usize,
num_clients: usize,
file_name: &str,
_is_baseline: bool,
_arg: Option<()>,
) -> Result<()> {
let expected_lines = num_clients * cores_per_client * 5;
for _i in 0..expected_lines {
let (prev, matched) =
proc.exp_regex(r#"init::dynrep: (.*),(\d+),(\d+),(\d+),(\d+)"#)?;
*output += prev.as_str();
*output += matched.as_str();

// Append parsed results to a CSV file
let write_headers = !Path::new(file_name).exists();
let mut csv_file = OpenOptions::new()
.append(true)
.create(true)
.open(file_name)
.expect("Can't open file");
if write_headers {
let row = "git_rev,nclients,cores_per_client,benchmark,machine_id,thread_id,time,operations\n";
let r = csv_file.write(row.as_bytes());
assert!(r.is_ok());
}

let parts: Vec<&str> = matched.split("init::dynrep: ").collect();
assert!(parts.len() >= 2);

let r = csv_file.write(format!("{},", env!("GIT_HASH")).as_bytes());
assert!(r.is_ok());

let r = csv_file.write(format!("{},", num_clients).as_bytes());
assert!(r.is_ok());

let r = csv_file.write(format!("{},", cores_per_client).as_bytes());
assert!(r.is_ok());

let r = csv_file.write(parts[1].as_bytes());
assert!(r.is_ok());
let r = csv_file.write("\n".as_bytes());
assert!(r.is_ok());
}
*output += proc.exp_string("dynrep_test OK")?.as_str();
Ok(())
}
Expand Down
37 changes: 21 additions & 16 deletions usr/init/src/dynrep/allocator.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,65 @@
use core::slice::from_raw_parts_mut;
use x86::bits64::paging::LARGE_PAGE_SIZE;
use lazy_static::lazy_static;
use core::sync::atomic::AtomicU8;
use core::sync::atomic::Ordering;

use log::info;
use core::{
alloc::{AllocError, Allocator, Layout},
ptr::NonNull,
};
use log::info;

pub const BASE: u64 = 0x0510_0000_0000;
pub const MAX_FRAMES: u64 = 600;
pub const MAX_FRAMES: u64 = 200;

static NODE_ID: AtomicU8 = AtomicU8::new(1);

#[derive(Clone, Copy)]
pub struct MyAllocator;

impl Default for MyAllocator {
fn default() -> Self {
impl MyAllocator {
fn allocate_pages(node_id: u8) {
let mut allocated = 0;
let node_offset = (node_id - 1) as u64 * LARGE_PAGE_SIZE as u64 * MAX_FRAMES;
while allocated < MAX_FRAMES {
// Allocate a large page of physical memory
// Note that even if you allocate a base page, behind the scenes a large page is allocated
// because DCM (and thus DiNOS) only allocates at large page granularity
// 1 is the client machine id we want to allocate from
let (frame_id, paddr) = vibrio::syscalls::PhysicalMemory::allocate_large_page(1)
let (frame_id, paddr) = vibrio::syscalls::PhysicalMemory::allocate_large_page(node_id as usize)
.expect("Failed to get physical memory large page");
info!("large frame id={:?}, paddr={:?}", frame_id, paddr);

// Map allocated physical memory into user space so we can actually access it.
unsafe {
vibrio::syscalls::VSpace::map_frame(
frame_id,
BASE + (allocated * LARGE_PAGE_SIZE as u64),
)
.expect("Failed to map base page");
vibrio::syscalls::VSpace::map_frame(frame_id, BASE + node_offset + (allocated * LARGE_PAGE_SIZE as u64)).expect("Failed to map base page");
}
allocated += 1;
}
info!("# Allocated {} frames", allocated);
MyAllocator {}
info!("# Allocated {} frames on {}", allocated, node_id);
}
}

unsafe impl Allocator for MyAllocator {
fn allocate(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
let node_id = NODE_ID.fetch_add(1, Ordering::SeqCst);
let node_offset = (node_id - 1) as u64 * LARGE_PAGE_SIZE as u64 * MAX_FRAMES;
MyAllocator::allocate_pages(node_id);
info!("# Allocating {:?}", layout);
if layout.size() > LARGE_PAGE_SIZE * MAX_FRAMES as usize {
return Err(AllocError);
}

let slice = unsafe { from_raw_parts_mut(BASE as *mut u8, layout.size()) };
// DO we need to zero the memory for mapping to work?
let slice = unsafe {from_raw_parts_mut((BASE + node_offset) as *mut u8, layout.size()) };
Ok(NonNull::from(slice))
}

unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
info!("# Deallocating {:?}", layout);
vibrio::syscalls::VSpace::unmap(ptr.as_ptr() as u64, layout.size() as u64)
.expect("Failed to unmap base page");
/*for i in 0..MAX_FRAMES {
vibrio::syscalls::VSpace::unmap((BASE + (i * LARGE_PAGE_SIZE as u64)) as u64, LARGE_PAGE_SIZE as u64)
.expect("Failed to unmap base page");
}*/
}
}
65 changes: 48 additions & 17 deletions usr/init/src/dynrep/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ use log::info;

use hashbrown::{hash_map::DefaultHashBuilder, HashMap};

use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering;

use lineup::tls2::{Environment, SchedulerControlBlock};
use nr2::nr::{rwlock::RwLock, Dispatch, NodeReplicated};
use nr2::nr::{Dispatch, NodeReplicated};
use x86::bits64::paging::VAddr;
use x86::random::rdrand64;
use rawtime::Instant;

mod allocator;
use allocator::MyAllocator;

pub const NUM_ENTRIES: u64 = 50_000_000;
pub const NUM_ENTRIES: u64 = 500_000;

static POOR_MANS_BARRIER: AtomicUsize = AtomicUsize::new(0);

#[derive(Clone)]
struct HashTable {
Expand All @@ -24,7 +31,7 @@ struct HashTable {

impl Default for HashTable {
fn default() -> Self {
let allocator = MyAllocator::default();
let allocator = MyAllocator{};
let map = HashMap::<u64, u64, DefaultHashBuilder, MyAllocator>::with_capacity_in(
NUM_ENTRIES as usize,
allocator,
Expand Down Expand Up @@ -66,15 +73,44 @@ impl Dispatch for HashTable {
}
}

unsafe extern "C" fn bencher_trampoline(_arg1: *mut u8) -> *mut u8 {
thread_routine();
ptr::null_mut()
fn run_bench(mid: usize, core_id : usize, replica: Arc<NodeReplicated<HashTable>>) {
let ttkn = replica.register(mid - 1).unwrap();
let mut random_key :u64 = 0;
let batch_size = 64;
let duration = 5;

let mut iterations = 0;
while iterations <= duration {
let mut ops = 0;
let start = Instant::now();
while start.elapsed().as_secs() < 1 {
for i in 0..batch_size {
unsafe { rdrand64(&mut random_key) };
random_key = random_key % NUM_ENTRIES;
let _ = replica.execute(OpRd::Get(random_key), ttkn).unwrap();
ops += 1;
}
}
info!(
"dynhash,{},{},{},{}",mid,core_id, iterations, ops
);
iterations += 1;
}
}

fn thread_routine() {
unsafe extern "C" fn bencher_trampoline(args: *mut u8) -> *mut u8 {
let current_gtid = vibrio::syscalls::System::core_id().expect("Can't get core id");
let mid = kpi::system::mid_from_gtid(current_gtid);
info!("I am thread {:?} and I am on node {:?}", current_gtid, mid);
let replica: Arc<NodeReplicated<HashTable>> = Arc::from_raw(args as *const NodeReplicated<HashTable>);

// Synchronize with all cores
POOR_MANS_BARRIER.fetch_sub(1, Ordering::Release);
while POOR_MANS_BARRIER.load(Ordering::Acquire) != 0 {
core::hint::spin_loop();
}

run_bench(mid, current_gtid, replica.clone());
ptr::null_mut()
}

pub fn userspace_dynrep_test() {
Expand All @@ -96,17 +132,11 @@ pub fn userspace_dynrep_test() {

// Create data structure, with as many replicas as there are clients (assuming 1 numa node per client)
// TODO: change # of replicas to nnodes
let replicas = NonZeroUsize::new(1).unwrap();
let nrht = NodeReplicated::<HashTable>::new(replicas, |_| 0).unwrap();

// TODO: populate data structure
let ttkn = nrht.register(0).unwrap();
nrht.execute(OpRd::Get(0), ttkn).unwrap();
let num_replicas = NonZeroUsize::new(nnodes).unwrap();
let replicas = Arc::new(NodeReplicated::<HashTable>::new(num_replicas, |_| 0).unwrap());

let s = &vibrio::upcalls::PROCESS_SCHEDULER;

let mut gtids = Vec::with_capacity(ncores);

// We already have current core
gtids.push(current_gtid);

Expand Down Expand Up @@ -137,13 +167,14 @@ pub fn userspace_dynrep_test() {
32 * 4096,
move |_| {
let mut thandles = Vec::with_capacity(ncores);
POOR_MANS_BARRIER.store(ncores, Ordering::SeqCst);

for core_index in 0..ncores {
thandles.push(
Environment::thread()
.spawn_on_core(
Some(bencher_trampoline),
ncores as *mut u8,
Arc::into_raw(replicas.clone()) as *const _ as *mut u8,
gtids[core_index],
)
.expect("Can't spawn bench thread?"),
Expand Down

0 comments on commit 1e62b1c

Please sign in to comment.