pathfinder2/src/server.rs

131 lines
4 KiB
Rust
Raw Normal View History

2022-09-09 10:14:48 +00:00
use crate::flow;
use crate::io::read_edges_binary;
use crate::types::{Address, Edge};
2022-09-03 21:54:22 +00:00
use json::JsonValue;
2022-09-09 10:14:48 +00:00
use std::collections::HashMap;
2022-09-03 21:54:22 +00:00
use std::error::Error;
2022-09-09 10:14:48 +00:00
use std::io::{BufRead, BufReader, Write};
2022-09-12 09:38:47 +00:00
use std::ops::Deref;
use std::sync::{Arc, RwLock};
2022-09-09 10:14:48 +00:00
use std::thread;
2022-09-03 21:54:22 +00:00
use std::{
2022-09-09 10:14:48 +00:00
io::Read,
net::{TcpListener, TcpStream},
2022-09-03 21:54:22 +00:00
};
struct JsonRpcRequest {
id: JsonValue,
method: String,
params: JsonValue,
}
2022-09-12 09:38:47 +00:00
type EdgeMap = HashMap<Address, Vec<Edge>>;
pub fn start_server(port: u16) {
let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new())));
2022-09-09 10:14:48 +00:00
2022-09-12 09:38:47 +00:00
let listener =
TcpListener::bind(format!("127.0.0.1:{port}")).expect("Could not create server.");
loop {
let c = edges.clone();
match listener.accept() {
// TODO limit number of threads
Ok((socket, _)) => {
thread::spawn(move || {
match handle_connection(c.deref(), socket) {
Ok(()) => {}
Err(e) => {
// TODO respond to the jsonrpc
println!("Error handling connection: {e}");
}
}
});
2022-09-09 10:14:48 +00:00
}
2022-09-12 09:38:47 +00:00
Err(e) => println!("Error accepting connection: {e}"),
2022-09-09 10:14:48 +00:00
}
}
2022-09-12 09:38:47 +00:00
}
2022-09-09 10:14:48 +00:00
2022-09-12 09:38:47 +00:00
fn handle_connection(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
let request = read_request(&mut socket)?;
match request.method.as_str() {
"load_edges_binary" => {
let updated_edges = read_edges_binary(&request.params["file"].to_string())?;
let len = updated_edges.len();
*edges.write().unwrap() = Arc::new(updated_edges);
socket.write_all(jsonrpc_result(request.id, len).as_bytes())?;
}
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
let flow = flow::compute_flow(
&Address::from(request.params["from"].to_string().as_str()),
&Address::from(request.params["to"].to_string().as_str()),
//&U256::from(request.params["value"].to_string().as_str()),
e.as_ref(),
);
println!("Computed flow");
2022-09-09 10:14:48 +00:00
// TODO error handling
2022-09-13 14:54:15 +00:00
socket.write_all(jsonrpc_result(request.id, json::JsonValue::from(flow.0.to_string())).as_bytes())?;
2022-09-09 10:14:48 +00:00
}
2022-09-12 09:38:47 +00:00
"cancel" => {}
"update_edges" => {}
// TODO error handling
_ => {}
};
Ok(())
2022-09-03 21:54:22 +00:00
}
2022-09-09 10:14:48 +00:00
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
2022-09-03 21:54:22 +00:00
let payload = read_payload(socket)?;
let mut request = json::parse(&String::from_utf8(payload)?)?;
println!("Request: {request}");
let id = request["id"].take();
let params = request["params"].take();
match request["method"].as_str() {
Some(method) => Ok(JsonRpcRequest {
id,
method: method.to_string(),
params,
}),
_ => Err(From::from("Invalid JSON-RPC request: {request}")),
}
}
2022-09-09 10:14:48 +00:00
fn read_payload(socket: &mut TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
2022-09-03 21:54:22 +00:00
let mut reader = BufReader::new(socket);
let mut length = 0;
for result in reader.by_ref().lines() {
let l = result?;
if l.is_empty() {
break;
}
let header = "content-length: ";
if l.to_lowercase().starts_with(header) {
2022-09-09 10:14:48 +00:00
length = (&l[header.len()..]).parse::<usize>()?;
2022-09-03 21:54:22 +00:00
}
}
let mut payload = vec![0u8; length];
reader.read_exact(payload.as_mut_slice())?;
Ok(payload)
}
2022-09-09 10:14:48 +00:00
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
let payload = json::object! {
jsonrpc: "2.0",
id: id,
result: result.into(),
}
.dump();
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}