From d9103d4326f383a59b063d91036071c6a09c9c7f Mon Sep 17 00:00:00 2001 From: chriseth Date: Mon, 12 Sep 2022 11:38:47 +0200 Subject: [PATCH] Create threads earlier. --- src/flow/flow.rs | 3 ++ src/main.rs | 4 +- src/server.rs | 111 +++++++++++++++++++++++------------------------ 3 files changed, 58 insertions(+), 60 deletions(-) diff --git a/src/flow/flow.rs b/src/flow/flow.rs index caa4025..ecf955f 100644 --- a/src/flow/flow.rs +++ b/src/flow/flow.rs @@ -42,6 +42,9 @@ pub fn compute_flow( } } } + + // TODO prune + println!("Max flow: {flow}"); let transfers = extract_transfers(source, sink, &flow, used_edges); println!("Num transfers: {}", transfers.len()); diff --git a/src/main.rs b/src/main.rs index 6e72f5a..39673f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,15 +5,13 @@ mod io; mod server; mod types; -use server::Server; - fn main() { let port = if env::args().len() == 1 { 8080 } else { env::args().nth(1).unwrap().as_str().parse::().unwrap() }; - Server::start(port); + server::start_server(port); // let args: Vec = env::args().collect(); // if args.len() != 4 { diff --git a/src/server.rs b/src/server.rs index c11b02f..71f6bbc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,83 +5,80 @@ use json::JsonValue; use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader, Write}; -use std::sync::Arc; +use std::ops::Deref; +use std::sync::{Arc, RwLock}; use std::thread; use std::{ io::Read, net::{TcpListener, TcpStream}, }; -pub struct Server { - edges: Arc>>, - //threads: Vec>, -} - struct JsonRpcRequest { id: JsonValue, method: String, params: JsonValue, } -impl Server { - pub fn start(port: u16) { - let mut server = Server { - edges: Arc::new(HashMap::new()), - //threads: Vec::new(), - }; +type EdgeMap = HashMap>; - let listener = - TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); - loop { - match listener.accept() { - Ok((socket, _)) => match server.handle_connection(socket) { - Ok(_) => {} - Err(e) => println!("Error communicating with client: {e}"), - }, - Err(e) => println!("Error accepting connection: {e}"), - } - } - } +pub fn start_server(port: u16) { + let edges: Arc>> = Arc::new(RwLock::new(Arc::new(HashMap::new()))); - fn handle_connection(&mut self, mut socket: TcpStream) -> Result<(), Box> { - let request = read_request(&mut socket)?; - match request.method.as_str() { - "load_edges_binary" => { - // TODO do this in its own thread? - let edges = read_edges_binary(&request.params["file"].to_string())?; - self.edges = Arc::new(edges); - socket.write_all(jsonrpc_result(request.id, self.edges.len()).as_bytes())?; - } - "compute_transfer" => { - // TODO limit number of threads - let edges = self.edges.clone(); - let _thread = thread::spawn(move || { - println!("Computing flow"); - let flow = flow::compute_flow( - &Address::from(request.params["from"].to_string().as_str()), - &Address::from(request.params["to"].to_string().as_str()), - //&U256::from(request.params["value"].to_string().as_str()), - edges.as_ref(), - ); - println!("Computed flow"); - // TODO error handling - socket - .write_all( - jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes(), - ) - .unwrap(); + let listener = + TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); + loop { + let c = edges.clone(); + match listener.accept() { + // TODO limit number of threads + Ok((socket, _)) => { + thread::spawn(move || { + match handle_connection(c.deref(), socket) { + Ok(()) => {} + Err(e) => { + // TODO respond to the jsonrpc + println!("Error handling connection: {e}"); + } + } }); - //self.threads.push(thread); } - "cancel" => {} - "update_edges" => {} - // TODO error handling - _ => {} + Err(e) => println!("Error accepting connection: {e}"), } - Ok(()) } } +fn handle_connection( + edges: &RwLock>>>, + mut socket: TcpStream, +) -> Result<(), Box> { + let request = read_request(&mut socket)?; + match request.method.as_str() { + "load_edges_binary" => { + let updated_edges = read_edges_binary(&request.params["file"].to_string())?; + let len = updated_edges.len(); + *edges.write().unwrap() = Arc::new(updated_edges); + socket.write_all(jsonrpc_result(request.id, len).as_bytes())?; + } + "compute_transfer" => { + println!("Computing flow"); + let e = edges.read().unwrap().clone(); + let flow = flow::compute_flow( + &Address::from(request.params["from"].to_string().as_str()), + &Address::from(request.params["to"].to_string().as_str()), + //&U256::from(request.params["value"].to_string().as_str()), + e.as_ref(), + ); + println!("Computed flow"); + // TODO error handling + socket.write_all(jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes())?; + } + "cancel" => {} + "update_edges" => {} + // TODO error handling + _ => {} + }; + Ok(()) +} + fn read_request(socket: &mut TcpStream) -> Result> { let payload = read_payload(socket)?; let mut request = json::parse(&String::from_utf8(payload)?)?;