Create threads earlier.

This commit is contained in:
chriseth 2022-09-12 11:38:47 +02:00
parent 0191fb463a
commit d9103d4326
3 changed files with 58 additions and 60 deletions

View file

@ -42,6 +42,9 @@ pub fn compute_flow(
} }
} }
} }
// TODO prune
println!("Max flow: {flow}"); println!("Max flow: {flow}");
let transfers = extract_transfers(source, sink, &flow, used_edges); let transfers = extract_transfers(source, sink, &flow, used_edges);
println!("Num transfers: {}", transfers.len()); println!("Num transfers: {}", transfers.len());

View file

@ -5,15 +5,13 @@ mod io;
mod server; mod server;
mod types; mod types;
use server::Server;
fn main() { fn main() {
let port = if env::args().len() == 1 { let port = if env::args().len() == 1 {
8080 8080
} else { } else {
env::args().nth(1).unwrap().as_str().parse::<u16>().unwrap() env::args().nth(1).unwrap().as_str().parse::<u16>().unwrap()
}; };
Server::start(port); server::start_server(port);
// let args: Vec<String> = env::args().collect(); // let args: Vec<String> = env::args().collect();
// if args.len() != 4 { // if args.len() != 4 {

View file

@ -5,83 +5,80 @@ use json::JsonValue;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::sync::Arc; use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::{ use std::{
io::Read, io::Read,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
}; };
pub struct Server {
edges: Arc<HashMap<Address, Vec<Edge>>>,
//threads: Vec<thread::JoinHandle<()>>,
}
struct JsonRpcRequest { struct JsonRpcRequest {
id: JsonValue, id: JsonValue,
method: String, method: String,
params: JsonValue, params: JsonValue,
} }
impl Server { type EdgeMap = HashMap<Address, Vec<Edge>>;
pub fn start(port: u16) {
let mut server = Server {
edges: Arc::new(HashMap::new()),
//threads: Vec::new(),
};
let listener = pub fn start_server(port: u16) {
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server."); let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new())));
loop {
match listener.accept() {
Ok((socket, _)) => match server.handle_connection(socket) {
Ok(_) => {}
Err(e) => println!("Error communicating with client: {e}"),
},
Err(e) => println!("Error accepting connection: {e}"),
}
}
}
fn handle_connection(&mut self, mut socket: TcpStream) -> Result<(), Box<dyn Error>> { let listener =
let request = read_request(&mut socket)?; TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
match request.method.as_str() { loop {
"load_edges_binary" => { let c = edges.clone();
// TODO do this in its own thread? match listener.accept() {
let edges = read_edges_binary(&request.params["file"].to_string())?; // TODO limit number of threads
self.edges = Arc::new(edges); Ok((socket, _)) => {
socket.write_all(jsonrpc_result(request.id, self.edges.len()).as_bytes())?; thread::spawn(move || {
} match handle_connection(c.deref(), socket) {
"compute_transfer" => { Ok(()) => {}
// TODO limit number of threads Err(e) => {
let edges = self.edges.clone(); // TODO respond to the jsonrpc
let _thread = thread::spawn(move || { println!("Error handling connection: {e}");
println!("Computing flow"); }
let flow = flow::compute_flow( }
&Address::from(request.params["from"].to_string().as_str()),
&Address::from(request.params["to"].to_string().as_str()),
//&U256::from(request.params["value"].to_string().as_str()),
edges.as_ref(),
);
println!("Computed flow");
// TODO error handling
socket
.write_all(
jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes(),
)
.unwrap();
}); });
//self.threads.push(thread);
} }
"cancel" => {} Err(e) => println!("Error accepting connection: {e}"),
"update_edges" => {}
// TODO error handling
_ => {}
} }
Ok(())
} }
} }
fn handle_connection(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
let request = read_request(&mut socket)?;
match request.method.as_str() {
"load_edges_binary" => {
let updated_edges = read_edges_binary(&request.params["file"].to_string())?;
let len = updated_edges.len();
*edges.write().unwrap() = Arc::new(updated_edges);
socket.write_all(jsonrpc_result(request.id, len).as_bytes())?;
}
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
let flow = flow::compute_flow(
&Address::from(request.params["from"].to_string().as_str()),
&Address::from(request.params["to"].to_string().as_str()),
//&U256::from(request.params["value"].to_string().as_str()),
e.as_ref(),
);
println!("Computed flow");
// TODO error handling
socket.write_all(jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes())?;
}
"cancel" => {}
"update_edges" => {}
// TODO error handling
_ => {}
};
Ok(())
}
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> { fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
let payload = read_payload(socket)?; let payload = read_payload(socket)?;
let mut request = json::parse(&String::from_utf8(payload)?)?; let mut request = json::parse(&String::from_utf8(payload)?)?;