diff --git a/src/flow/adjacencies.rs b/src/flow/adjacencies.rs index 7736517..0f0979a 100644 --- a/src/flow/adjacencies.rs +++ b/src/flow/adjacencies.rs @@ -2,7 +2,6 @@ use crate::flow::Node; use crate::types::{Address, Edge, U256}; use std::cmp::Reverse; use std::collections::HashMap; -use std::collections::HashSet; pub struct Adjacencies<'a> { edges: &'a HashMap>, diff --git a/src/flow/flow.rs b/src/flow/flow.rs index d49e522..616d5df 100644 --- a/src/flow/flow.rs +++ b/src/flow/flow.rs @@ -5,7 +5,11 @@ use std::cmp::min; use std::collections::HashMap; use std::collections::VecDeque; -pub fn compute_flow(source: &Address, sink: &Address, edges: &HashMap>) { +pub fn compute_flow( + source: &Address, + sink: &Address, + edges: &HashMap>, +) -> String { let mut adjacencies = Adjacencies::new(edges); let mut used_edges: HashMap> = HashMap::new(); @@ -39,6 +43,7 @@ pub fn compute_flow(source: &Address, sink: &Address, edges: &HashMap().unwrap() + }; + Server::start(port); // let args: Vec = env::args().collect(); // if args.len() != 4 { diff --git a/src/server.rs b/src/server.rs index 4ccbe4b..86995e3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,24 +1,20 @@ -use json; +use crate::flow; +use crate::io::read_edges_binary; +use crate::types::{Address, Edge}; use json::JsonValue; +use std::collections::HashMap; use std::error::Error; -use std::io::{BufRead, BufReader, ErrorKind}; +use std::io::{BufRead, BufReader, Write}; +use std::sync::Arc; +use std::thread; use std::{ - io::{Read, Write}, - net::{SocketAddr, TcpListener, TcpStream}, + io::Read, + net::{TcpListener, TcpStream}, }; -pub fn start(port: u16) { - let listener = - TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); - loop { - match listener.accept() { - Ok((socket, address)) => match handle_connection(socket, address) { - Ok(_) => {} - Err(e) => println!("Error communicating with client: {e}"), - }, - Err(e) => println!("Error accepting connection: {e}"), - } - } +pub struct Server { + edges: Arc>>, + //threads: Vec>, } struct JsonRpcRequest { @@ -27,19 +23,66 @@ struct JsonRpcRequest { params: JsonValue, } -fn handle_connection(mut socket: TcpStream, address: SocketAddr) -> Result<(), Box> { - let request = read_request(socket)?; - match request.method.as_str() { - "load_edges_binary" => {} - "compute_transfer" => {} - "cancel" => {} - "update_edges" => {} - _ => {} +impl Server { + pub fn start(port: u16) { + let mut server = Server { + edges: Arc::new(HashMap::new()), + //threads: Vec::new(), + }; + + let listener = + TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); + loop { + match listener.accept() { + Ok((socket, _)) => match server.handle_connection(socket) { + Ok(_) => {} + Err(e) => println!("Error communicating with client: {e}"), + }, + Err(e) => println!("Error accepting connection: {e}"), + } + } + } + + fn handle_connection(&mut self, mut socket: TcpStream) -> Result<(), Box> { + let request = read_request(&mut socket)?; + match request.method.as_str() { + "load_edges_binary" => { + // TODO do this in its own thread? + let edges = read_edges_binary(&request.params["file"].to_string())?; + self.edges = Arc::new(edges); + socket.write_all(jsonrpc_result(request.id, self.edges.len()).as_bytes())?; + } + "compute_transfer" => { + // TODO limit number of threads + let edges = self.edges.clone(); + let _thread = thread::spawn(move || { + println!("Computing flow"); + let flow = flow::compute_flow( + &Address::from(request.params["from"].to_string().as_str()), + &Address::from(request.params["to"].to_string().as_str()), + //&U256::from(request.params["value"].to_string().as_str()), + edges.as_ref(), + ); + println!("Computed flow"); + // TODO error handling + socket + .write_all( + jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes(), + ) + .unwrap(); + }); + //self.threads.push(thread); + } + "cancel" => {} + "update_edges" => {} + // TODO error handling + _ => {} + } + Ok(()) } - Ok(()) } -fn read_request(mut socket: TcpStream) -> Result> { +fn read_request(socket: &mut TcpStream) -> Result> { // let mut buf_reader = BufReader::new(&mut socket); // let http_request: Vec<_> = buf_reader // .by_ref() @@ -70,7 +113,7 @@ fn read_request(mut socket: TcpStream) -> Result> } } -fn read_payload(socket: TcpStream) -> Result, Box> { +fn read_payload(socket: &mut TcpStream) -> Result, Box> { let mut reader = BufReader::new(socket); let mut length = 0; for result in reader.by_ref().lines() { @@ -81,7 +124,7 @@ fn read_payload(socket: TcpStream) -> Result, Box> { let header = "content-length: "; if l.to_lowercase().starts_with(header) { - length = usize::from_str_radix(&l[header.len()..], 10)?; + length = (&l[header.len()..]).parse::()?; } } let mut payload = vec![0u8; length]; @@ -89,3 +132,17 @@ fn read_payload(socket: TcpStream) -> Result, Box> { reader.read_exact(payload.as_mut_slice())?; Ok(payload) } + +fn jsonrpc_result(id: JsonValue, result: impl Into) -> String { + let payload = json::object! { + jsonrpc: "2.0", + id: id, + result: result.into(), + } + .dump(); + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", + payload.len(), + payload + ) +} diff --git a/src/types/u256.rs b/src/types/u256.rs index 662ea86..27f07ad 100644 --- a/src/types/u256.rs +++ b/src/types/u256.rs @@ -101,6 +101,7 @@ impl Display for U256 { } } +#[cfg(test)] mod test { use super::U256; #[test]