2022-10-08 10:11:26 +00:00
|
|
|
use crate::graph;
|
2022-09-09 10:14:48 +00:00
|
|
|
use crate::io::read_edges_binary;
|
2022-09-17 14:46:05 +00:00
|
|
|
use crate::types::{Address, Edge, U256};
|
2022-09-03 21:54:22 +00:00
|
|
|
use json::JsonValue;
|
2022-09-09 10:14:48 +00:00
|
|
|
use std::collections::HashMap;
|
2022-09-03 21:54:22 +00:00
|
|
|
use std::error::Error;
|
2022-09-09 10:14:48 +00:00
|
|
|
use std::io::{BufRead, BufReader, Write};
|
2022-09-12 09:38:47 +00:00
|
|
|
use std::ops::Deref;
|
2022-09-20 14:51:37 +00:00
|
|
|
use std::sync::{mpsc, Arc, Mutex, RwLock};
|
2022-09-09 10:14:48 +00:00
|
|
|
use std::thread;
|
2022-09-03 21:54:22 +00:00
|
|
|
use std::{
|
2022-09-09 10:14:48 +00:00
|
|
|
io::Read,
|
|
|
|
net::{TcpListener, TcpStream},
|
2022-09-03 21:54:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
struct JsonRpcRequest {
|
|
|
|
id: JsonValue,
|
|
|
|
method: String,
|
|
|
|
params: JsonValue,
|
|
|
|
}
|
|
|
|
|
2022-09-12 09:38:47 +00:00
|
|
|
type EdgeMap = HashMap<Address, Vec<Edge>>;
|
|
|
|
|
2022-09-20 14:51:37 +00:00
|
|
|
pub fn start_server(port: u16, queue_size: usize, threads: u64) {
|
2022-09-12 09:38:47 +00:00
|
|
|
let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new())));
|
2022-09-09 10:14:48 +00:00
|
|
|
|
2022-09-20 14:51:37 +00:00
|
|
|
let (sender, receiver) = mpsc::sync_channel(queue_size);
|
|
|
|
let protected_receiver = Arc::new(Mutex::new(receiver));
|
|
|
|
for _ in 0..threads {
|
|
|
|
let rec = protected_receiver.clone();
|
|
|
|
let e = edges.clone();
|
|
|
|
thread::spawn(move || {
|
|
|
|
loop {
|
|
|
|
let socket = rec.lock().unwrap().recv().unwrap();
|
|
|
|
match handle_connection(e.deref(), socket) {
|
|
|
|
Ok(()) => {}
|
|
|
|
Err(e) => {
|
|
|
|
// TODO respond to the jsonrpc
|
|
|
|
println!("Error handling connection: {e}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2022-09-12 09:38:47 +00:00
|
|
|
let listener =
|
|
|
|
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
|
|
|
|
loop {
|
|
|
|
match listener.accept() {
|
2022-09-20 14:51:37 +00:00
|
|
|
Ok((socket, _)) => match sender.try_send(socket) {
|
|
|
|
Ok(()) => {}
|
|
|
|
Err(e) => println!("Queue full: {e}"),
|
|
|
|
},
|
2022-09-12 09:38:47 +00:00
|
|
|
Err(e) => println!("Error accepting connection: {e}"),
|
2022-09-09 10:14:48 +00:00
|
|
|
}
|
|
|
|
}
|
2022-09-12 09:38:47 +00:00
|
|
|
}
|
2022-09-09 10:14:48 +00:00
|
|
|
|
2022-09-12 09:38:47 +00:00
|
|
|
fn handle_connection(
|
|
|
|
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
|
|
|
|
mut socket: TcpStream,
|
|
|
|
) -> Result<(), Box<dyn Error>> {
|
|
|
|
let request = read_request(&mut socket)?;
|
|
|
|
match request.method.as_str() {
|
|
|
|
"load_edges_binary" => {
|
|
|
|
let updated_edges = read_edges_binary(&request.params["file"].to_string())?;
|
|
|
|
let len = updated_edges.len();
|
|
|
|
*edges.write().unwrap() = Arc::new(updated_edges);
|
2022-09-17 13:46:08 +00:00
|
|
|
socket.write_all(jsonrpc_response(request.id, len).as_bytes())?;
|
2022-09-12 09:38:47 +00:00
|
|
|
}
|
|
|
|
"compute_transfer" => {
|
|
|
|
println!("Computing flow");
|
|
|
|
let e = edges.read().unwrap().clone();
|
2022-09-17 14:46:05 +00:00
|
|
|
compute_transfer(request, e.as_ref(), socket)?;
|
2022-09-09 10:14:48 +00:00
|
|
|
}
|
2022-09-12 09:38:47 +00:00
|
|
|
"cancel" => {}
|
|
|
|
"update_edges" => {}
|
|
|
|
// TODO error handling
|
|
|
|
_ => {}
|
|
|
|
};
|
|
|
|
Ok(())
|
2022-09-03 21:54:22 +00:00
|
|
|
}
|
|
|
|
|
2022-09-17 14:46:05 +00:00
|
|
|
fn compute_transfer(
|
|
|
|
request: JsonRpcRequest,
|
|
|
|
edges: &HashMap<Address, Vec<Edge>>,
|
|
|
|
mut socket: TcpStream,
|
|
|
|
) -> Result<(), Box<dyn Error>> {
|
|
|
|
socket.write_all(chunked_header().as_bytes())?;
|
|
|
|
let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
|
|
|
|
vec![Some(1), Some(2), None]
|
|
|
|
} else {
|
|
|
|
vec![None]
|
|
|
|
};
|
|
|
|
for max_distance in max_distances {
|
2022-10-08 10:11:26 +00:00
|
|
|
let (flow, transfers) = graph::compute_flow(
|
2022-09-17 14:46:05 +00:00
|
|
|
&Address::from(request.params["from"].to_string().as_str()),
|
|
|
|
&Address::from(request.params["to"].to_string().as_str()),
|
|
|
|
edges,
|
|
|
|
if request.params.has_key("value") {
|
|
|
|
U256::from(request.params["value"].to_string().as_str())
|
|
|
|
} else {
|
|
|
|
U256::MAX
|
|
|
|
},
|
|
|
|
max_distance,
|
|
|
|
);
|
|
|
|
println!(
|
|
|
|
"Computed flow with max distance {:?}: {}",
|
|
|
|
max_distance, flow
|
|
|
|
);
|
|
|
|
// TODO error handling
|
|
|
|
socket.write_all(
|
|
|
|
chunked_response(
|
|
|
|
&(jsonrpc_result(
|
|
|
|
request.id.clone(),
|
|
|
|
json::object! {
|
|
|
|
flow: flow.to_string(),
|
|
|
|
final: max_distance.is_none(),
|
|
|
|
transfers: transfers.into_iter().map(|e| json::object! {
|
|
|
|
from: e.from.to_string(),
|
|
|
|
to: e.to.to_string(),
|
2022-10-07 21:22:03 +00:00
|
|
|
token_owner: e.token.to_string(),
|
2022-09-17 14:46:05 +00:00
|
|
|
value: e.capacity.to_string()
|
|
|
|
}).collect::<Vec<_>>(),
|
|
|
|
},
|
|
|
|
) + "\r\n"),
|
|
|
|
)
|
|
|
|
.as_bytes(),
|
|
|
|
)?;
|
|
|
|
}
|
|
|
|
socket.write_all(chunked_close().as_bytes())?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-09-09 10:14:48 +00:00
|
|
|
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
|
2022-09-03 21:54:22 +00:00
|
|
|
let payload = read_payload(socket)?;
|
|
|
|
let mut request = json::parse(&String::from_utf8(payload)?)?;
|
|
|
|
println!("Request: {request}");
|
|
|
|
let id = request["id"].take();
|
|
|
|
let params = request["params"].take();
|
|
|
|
match request["method"].as_str() {
|
|
|
|
Some(method) => Ok(JsonRpcRequest {
|
|
|
|
id,
|
|
|
|
method: method.to_string(),
|
|
|
|
params,
|
|
|
|
}),
|
|
|
|
_ => Err(From::from("Invalid JSON-RPC request: {request}")),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-09 10:14:48 +00:00
|
|
|
fn read_payload(socket: &mut TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
|
2022-09-03 21:54:22 +00:00
|
|
|
let mut reader = BufReader::new(socket);
|
|
|
|
let mut length = 0;
|
|
|
|
for result in reader.by_ref().lines() {
|
|
|
|
let l = result?;
|
|
|
|
if l.is_empty() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
let header = "content-length: ";
|
|
|
|
if l.to_lowercase().starts_with(header) {
|
2022-10-10 07:33:32 +00:00
|
|
|
length = l[header.len()..].parse::<usize>()?;
|
2022-09-03 21:54:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
let mut payload = vec![0u8; length];
|
|
|
|
|
|
|
|
reader.read_exact(payload.as_mut_slice())?;
|
|
|
|
Ok(payload)
|
|
|
|
}
|
2022-09-09 10:14:48 +00:00
|
|
|
|
2022-09-17 13:46:08 +00:00
|
|
|
fn jsonrpc_response(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
|
|
|
|
let payload = jsonrpc_result(id, result);
|
2022-09-09 10:14:48 +00:00
|
|
|
format!(
|
|
|
|
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
|
|
|
|
payload.len(),
|
|
|
|
payload
|
|
|
|
)
|
|
|
|
}
|
2022-09-17 13:46:08 +00:00
|
|
|
|
|
|
|
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
|
|
|
|
json::object! {
|
|
|
|
jsonrpc: "2.0",
|
|
|
|
id: id,
|
|
|
|
result: result.into(),
|
|
|
|
}
|
|
|
|
.dump()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn chunked_header() -> String {
|
|
|
|
"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".to_string()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn chunked_response(data: &str) -> String {
|
|
|
|
if data.is_empty() {
|
|
|
|
String::new()
|
|
|
|
} else {
|
|
|
|
format!("{:x}\r\n{}\r\n", data.len(), data)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn chunked_close() -> String {
|
|
|
|
"0\r\n\r\n".to_string()
|
|
|
|
}
|