Multithreading with queues.

This commit is contained in:
chriseth 2022-09-20 16:51:37 +02:00
parent 05a3aaf21b
commit c1f7de858c
3 changed files with 33 additions and 26 deletions

View file

@ -287,16 +287,14 @@ fn smallest_edge_from(
used_edges: &HashMap<Node, HashMap<Node, U256>>, used_edges: &HashMap<Node, HashMap<Node, U256>>,
n: &Node, n: &Node,
) -> Option<(Node, U256)> { ) -> Option<(Node, U256)> {
used_edges used_edges.get(n).and_then(|out| {
.get(n) out.iter()
.and_then(|out| { .min_by_key(|(_, c)| {
out.iter() assert!(**c != U256::from(0));
.min_by_key(|(_, c)| { *c
assert!(**c != U256::from(0)); })
*c .map(|(t, c)| (t.clone(), *c))
}) })
.map(|(t, c)| (t.clone(), *c))
})
} }
fn smallest_edge_to( fn smallest_edge_to(

View file

@ -11,7 +11,7 @@ fn main() {
} else { } else {
env::args().nth(1).unwrap().as_str().parse::<u16>().unwrap() env::args().nth(1).unwrap().as_str().parse::<u16>().unwrap()
}; };
server::start_server(port); server::start_server(port, 10, 4);
// let args: Vec<String> = env::args().collect(); // let args: Vec<String> = env::args().collect();
// if args.len() != 4 { // if args.len() != 4 {

View file

@ -6,7 +6,7 @@ use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::ops::Deref; use std::ops::Deref;
use std::sync::{Arc, RwLock}; use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::thread; use std::thread;
use std::{ use std::{
io::Read, io::Read,
@ -21,26 +21,35 @@ struct JsonRpcRequest {
type EdgeMap = HashMap<Address, Vec<Edge>>; type EdgeMap = HashMap<Address, Vec<Edge>>;
pub fn start_server(port: u16) { pub fn start_server(port: u16, queue_size: usize, threads: u64) {
let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new()))); let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new())));
let (sender, receiver) = mpsc::sync_channel(queue_size);
let protected_receiver = Arc::new(Mutex::new(receiver));
for _ in 0..threads {
let rec = protected_receiver.clone();
let e = edges.clone();
thread::spawn(move || {
loop {
let socket = rec.lock().unwrap().recv().unwrap();
match handle_connection(e.deref(), socket) {
Ok(()) => {}
Err(e) => {
// TODO respond to the jsonrpc
println!("Error handling connection: {e}");
}
}
}
});
}
let listener = let listener =
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
loop { loop {
let c = edges.clone();
match listener.accept() { match listener.accept() {
// TODO limit number of threads Ok((socket, _)) => match sender.try_send(socket) {
Ok((socket, _)) => { Ok(()) => {}
thread::spawn(move || { Err(e) => println!("Queue full: {e}"),
match handle_connection(c.deref(), socket) { },
Ok(()) => {}
Err(e) => {
// TODO respond to the jsonrpc
println!("Error handling connection: {e}");
}
}
});
}
Err(e) => println!("Error accepting connection: {e}"), Err(e) => println!("Error accepting connection: {e}"),
} }
} }