From c1f7de858c6e53212b7bbf905728c8c691e6d10b Mon Sep 17 00:00:00 2001 From: chriseth Date: Tue, 20 Sep 2022 16:51:37 +0200 Subject: [PATCH] Multithreading with queues. --- src/flow/flow.rs | 18 ++++++++---------- src/main.rs | 2 +- src/server.rs | 39 ++++++++++++++++++++++++--------------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/src/flow/flow.rs b/src/flow/flow.rs index bd3838d..92e363b 100644 --- a/src/flow/flow.rs +++ b/src/flow/flow.rs @@ -287,16 +287,14 @@ fn smallest_edge_from( used_edges: &HashMap>, n: &Node, ) -> Option<(Node, U256)> { - used_edges - .get(n) - .and_then(|out| { - out.iter() - .min_by_key(|(_, c)| { - assert!(**c != U256::from(0)); - *c - }) - .map(|(t, c)| (t.clone(), *c)) - }) + used_edges.get(n).and_then(|out| { + out.iter() + .min_by_key(|(_, c)| { + assert!(**c != U256::from(0)); + *c + }) + .map(|(t, c)| (t.clone(), *c)) + }) } fn smallest_edge_to( diff --git a/src/main.rs b/src/main.rs index 39673f8..239e6bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ fn main() { } else { env::args().nth(1).unwrap().as_str().parse::().unwrap() }; - server::start_server(port); + server::start_server(port, 10, 4); // let args: Vec = env::args().collect(); // if args.len() != 4 { diff --git a/src/server.rs b/src/server.rs index 7218206..5080d18 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader, Write}; use std::ops::Deref; -use std::sync::{Arc, RwLock}; +use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::thread; use std::{ io::Read, @@ -21,26 +21,35 @@ struct JsonRpcRequest { type EdgeMap = HashMap>; -pub fn start_server(port: u16) { +pub fn start_server(port: u16, queue_size: usize, threads: u64) { let edges: Arc>> = Arc::new(RwLock::new(Arc::new(HashMap::new()))); + let (sender, receiver) = mpsc::sync_channel(queue_size); + let protected_receiver = Arc::new(Mutex::new(receiver)); + for _ in 0..threads { + let rec = protected_receiver.clone(); + let e = edges.clone(); + thread::spawn(move || { + loop { + let socket = rec.lock().unwrap().recv().unwrap(); + match handle_connection(e.deref(), socket) { + Ok(()) => {} + Err(e) => { + // TODO respond to the jsonrpc + println!("Error handling connection: {e}"); + } + } + } + }); + } let listener = TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); loop { - let c = edges.clone(); match listener.accept() { - // TODO limit number of threads - Ok((socket, _)) => { - thread::spawn(move || { - match handle_connection(c.deref(), socket) { - Ok(()) => {} - Err(e) => { - // TODO respond to the jsonrpc - println!("Error handling connection: {e}"); - } - } - }); - } + Ok((socket, _)) => match sender.try_send(socket) { + Ok(()) => {} + Err(e) => println!("Queue full: {e}"), + }, Err(e) => println!("Error accepting connection: {e}"), } }