Multithreading
This commit is contained in:
parent
531722cc3e
commit
fbe6183e57
6 changed files with 99 additions and 33 deletions
|
@ -2,7 +2,6 @@ use crate::flow::Node;
|
||||||
use crate::types::{Address, Edge, U256};
|
use crate::types::{Address, Edge, U256};
|
||||||
use std::cmp::Reverse;
|
use std::cmp::Reverse;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
pub struct Adjacencies<'a> {
|
pub struct Adjacencies<'a> {
|
||||||
edges: &'a HashMap<Address, Vec<Edge>>,
|
edges: &'a HashMap<Address, Vec<Edge>>,
|
||||||
|
|
|
@ -5,7 +5,11 @@ use std::cmp::min;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
pub fn compute_flow(source: &Address, sink: &Address, edges: &HashMap<Address, Vec<Edge>>) {
|
pub fn compute_flow(
|
||||||
|
source: &Address,
|
||||||
|
sink: &Address,
|
||||||
|
edges: &HashMap<Address, Vec<Edge>>,
|
||||||
|
) -> String {
|
||||||
let mut adjacencies = Adjacencies::new(edges);
|
let mut adjacencies = Adjacencies::new(edges);
|
||||||
let mut used_edges: HashMap<Node, HashMap<Node, U256>> = HashMap::new();
|
let mut used_edges: HashMap<Node, HashMap<Node, U256>> = HashMap::new();
|
||||||
|
|
||||||
|
@ -39,6 +43,7 @@ pub fn compute_flow(source: &Address, sink: &Address, edges: &HashMap<Address, V
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println!("Max flow: {flow}");
|
println!("Max flow: {flow}");
|
||||||
|
flow.to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn augmenting_path(
|
fn augmenting_path(
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
|
|
|
@ -5,10 +5,15 @@ mod io;
|
||||||
mod server;
|
mod server;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use types::Address;
|
use server::Server;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
server::start(8080);
|
let port = if env::args().len() == 1 {
|
||||||
|
8080
|
||||||
|
} else {
|
||||||
|
env::args().nth(1).unwrap().as_str().parse::<u16>().unwrap()
|
||||||
|
};
|
||||||
|
Server::start(port);
|
||||||
|
|
||||||
// let args: Vec<String> = env::args().collect();
|
// let args: Vec<String> = env::args().collect();
|
||||||
// if args.len() != 4 {
|
// if args.len() != 4 {
|
||||||
|
|
113
src/server.rs
113
src/server.rs
|
@ -1,24 +1,20 @@
|
||||||
use json;
|
use crate::flow;
|
||||||
|
use crate::io::read_edges_binary;
|
||||||
|
use crate::types::{Address, Edge};
|
||||||
use json::JsonValue;
|
use json::JsonValue;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io::{BufRead, BufReader, ErrorKind};
|
use std::io::{BufRead, BufReader, Write};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
use std::{
|
use std::{
|
||||||
io::{Read, Write},
|
io::Read,
|
||||||
net::{SocketAddr, TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn start(port: u16) {
|
pub struct Server {
|
||||||
let listener =
|
edges: Arc<HashMap<Address, Vec<Edge>>>,
|
||||||
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
|
//threads: Vec<thread::JoinHandle<()>>,
|
||||||
loop {
|
|
||||||
match listener.accept() {
|
|
||||||
Ok((socket, address)) => match handle_connection(socket, address) {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(e) => println!("Error communicating with client: {e}"),
|
|
||||||
},
|
|
||||||
Err(e) => println!("Error accepting connection: {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct JsonRpcRequest {
|
struct JsonRpcRequest {
|
||||||
|
@ -27,19 +23,66 @@ struct JsonRpcRequest {
|
||||||
params: JsonValue,
|
params: JsonValue,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_connection(mut socket: TcpStream, address: SocketAddr) -> Result<(), Box<dyn Error>> {
|
impl Server {
|
||||||
let request = read_request(socket)?;
|
pub fn start(port: u16) {
|
||||||
match request.method.as_str() {
|
let mut server = Server {
|
||||||
"load_edges_binary" => {}
|
edges: Arc::new(HashMap::new()),
|
||||||
"compute_transfer" => {}
|
//threads: Vec::new(),
|
||||||
"cancel" => {}
|
};
|
||||||
"update_edges" => {}
|
|
||||||
_ => {}
|
let listener =
|
||||||
|
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
|
||||||
|
loop {
|
||||||
|
match listener.accept() {
|
||||||
|
Ok((socket, _)) => match server.handle_connection(socket) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => println!("Error communicating with client: {e}"),
|
||||||
|
},
|
||||||
|
Err(e) => println!("Error accepting connection: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_connection(&mut self, mut socket: TcpStream) -> Result<(), Box<dyn Error>> {
|
||||||
|
let request = read_request(&mut socket)?;
|
||||||
|
match request.method.as_str() {
|
||||||
|
"load_edges_binary" => {
|
||||||
|
// TODO do this in its own thread?
|
||||||
|
let edges = read_edges_binary(&request.params["file"].to_string())?;
|
||||||
|
self.edges = Arc::new(edges);
|
||||||
|
socket.write_all(jsonrpc_result(request.id, self.edges.len()).as_bytes())?;
|
||||||
|
}
|
||||||
|
"compute_transfer" => {
|
||||||
|
// TODO limit number of threads
|
||||||
|
let edges = self.edges.clone();
|
||||||
|
let _thread = thread::spawn(move || {
|
||||||
|
println!("Computing flow");
|
||||||
|
let flow = flow::compute_flow(
|
||||||
|
&Address::from(request.params["from"].to_string().as_str()),
|
||||||
|
&Address::from(request.params["to"].to_string().as_str()),
|
||||||
|
//&U256::from(request.params["value"].to_string().as_str()),
|
||||||
|
edges.as_ref(),
|
||||||
|
);
|
||||||
|
println!("Computed flow");
|
||||||
|
// TODO error handling
|
||||||
|
socket
|
||||||
|
.write_all(
|
||||||
|
jsonrpc_result(request.id, json::JsonValue::from(flow)).as_bytes(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
});
|
||||||
|
//self.threads.push(thread);
|
||||||
|
}
|
||||||
|
"cancel" => {}
|
||||||
|
"update_edges" => {}
|
||||||
|
// TODO error handling
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_request(mut socket: TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
|
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
|
||||||
// let mut buf_reader = BufReader::new(&mut socket);
|
// let mut buf_reader = BufReader::new(&mut socket);
|
||||||
// let http_request: Vec<_> = buf_reader
|
// let http_request: Vec<_> = buf_reader
|
||||||
// .by_ref()
|
// .by_ref()
|
||||||
|
@ -70,7 +113,7 @@ fn read_request(mut socket: TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_payload(socket: TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
|
fn read_payload(socket: &mut TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
|
||||||
let mut reader = BufReader::new(socket);
|
let mut reader = BufReader::new(socket);
|
||||||
let mut length = 0;
|
let mut length = 0;
|
||||||
for result in reader.by_ref().lines() {
|
for result in reader.by_ref().lines() {
|
||||||
|
@ -81,7 +124,7 @@ fn read_payload(socket: TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
|
||||||
|
|
||||||
let header = "content-length: ";
|
let header = "content-length: ";
|
||||||
if l.to_lowercase().starts_with(header) {
|
if l.to_lowercase().starts_with(header) {
|
||||||
length = usize::from_str_radix(&l[header.len()..], 10)?;
|
length = (&l[header.len()..]).parse::<usize>()?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut payload = vec![0u8; length];
|
let mut payload = vec![0u8; length];
|
||||||
|
@ -89,3 +132,17 @@ fn read_payload(socket: TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
|
||||||
reader.read_exact(payload.as_mut_slice())?;
|
reader.read_exact(payload.as_mut_slice())?;
|
||||||
Ok(payload)
|
Ok(payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
|
||||||
|
let payload = json::object! {
|
||||||
|
jsonrpc: "2.0",
|
||||||
|
id: id,
|
||||||
|
result: result.into(),
|
||||||
|
}
|
||||||
|
.dump();
|
||||||
|
format!(
|
||||||
|
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
|
||||||
|
payload.len(),
|
||||||
|
payload
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
|
@ -101,6 +101,7 @@ impl Display for U256 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::U256;
|
use super::U256;
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in a new issue