pathfinder2/src/server.rs

295 lines
9.7 KiB
Rust
Raw Normal View History

2022-10-08 10:11:26 +00:00
use crate::graph;
use crate::io::{import_from_safes_binary, read_edges_binary, read_edges_csv};
2022-12-06 21:16:36 +00:00
use crate::types::edge::EdgeDB;
2022-09-17 14:46:05 +00:00
use crate::types::{Address, Edge, U256};
2022-09-03 21:54:22 +00:00
use json::JsonValue;
use std::error::Error;
2022-11-08 22:04:34 +00:00
use std::io::Read;
2022-09-09 10:14:48 +00:00
use std::io::{BufRead, BufReader, Write};
2022-11-08 22:04:34 +00:00
use std::net::{TcpListener, TcpStream};
2022-09-12 09:38:47 +00:00
use std::ops::Deref;
2022-10-28 14:36:14 +00:00
use std::sync::mpsc::TrySendError;
2022-09-20 14:51:37 +00:00
use std::sync::{mpsc, Arc, Mutex, RwLock};
2022-09-09 10:14:48 +00:00
use std::thread;
2022-09-03 21:54:22 +00:00
struct JsonRpcRequest {
id: JsonValue,
method: String,
params: JsonValue,
}
pub fn start_server(listen_at: &str, queue_size: usize, threads: u64) {
2022-12-06 21:16:36 +00:00
let edges: Arc<RwLock<Arc<EdgeDB>>> = Arc::new(RwLock::new(Arc::new(EdgeDB::default())));
2022-09-09 10:14:48 +00:00
2022-09-20 14:51:37 +00:00
let (sender, receiver) = mpsc::sync_channel(queue_size);
let protected_receiver = Arc::new(Mutex::new(receiver));
for _ in 0..threads {
let rec = protected_receiver.clone();
let e = edges.clone();
2022-10-28 14:36:14 +00:00
thread::spawn(move || loop {
let socket = rec.lock().unwrap().recv().unwrap();
if let Err(e) = handle_connection(e.deref(), socket) {
println!("Error handling connection: {e}");
2022-09-20 14:51:37 +00:00
}
});
}
let listener = TcpListener::bind(listen_at).expect("Could not create server.");
2022-09-12 09:38:47 +00:00
loop {
match listener.accept() {
2022-09-20 14:51:37 +00:00
Ok((socket, _)) => match sender.try_send(socket) {
Ok(()) => {}
2022-10-28 14:36:14 +00:00
Err(TrySendError::Full(mut socket)) => {
let _ = socket.write_all(b"HTTP/1.1 503 Service Unavailable\r\n\r\n");
}
Err(TrySendError::Disconnected(_)) => {
panic!("Internal communication channel disconnected.");
}
2022-09-20 14:51:37 +00:00
},
2022-09-12 09:38:47 +00:00
Err(e) => println!("Error accepting connection: {e}"),
2022-09-09 10:14:48 +00:00
}
}
2022-09-12 09:38:47 +00:00
}
2022-09-09 10:14:48 +00:00
2022-09-12 09:38:47 +00:00
fn handle_connection(
2022-12-06 21:16:36 +00:00
edges: &RwLock<Arc<EdgeDB>>,
2022-09-12 09:38:47 +00:00
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
let request = read_request(&mut socket)?;
match request.method.as_str() {
"load_edges_binary" => {
2022-10-28 14:36:14 +00:00
let response = match load_edges_binary(edges, &request.params["file"].to_string()) {
Ok(len) => jsonrpc_response(request.id, len),
Err(e) => {
jsonrpc_error_response(request.id, -32000, &format!("Error loading edges: {e}"))
}
};
socket.write_all(response.as_bytes())?;
2022-09-12 09:38:47 +00:00
}
2022-10-28 20:39:47 +00:00
"load_edges_csv" => {
let response = match load_edges_csv(edges, &request.params["file"].to_string()) {
Ok(len) => jsonrpc_response(request.id, len),
Err(e) => {
jsonrpc_error_response(request.id, -32000, &format!("Error loading edges: {e}"))
}
};
socket.write_all(response.as_bytes())?;
}
"load_safes_binary" => {
let response = match load_safes_binary(edges, &request.params["file"].to_string()) {
Ok(len) => jsonrpc_response(request.id, len),
Err(e) => {
jsonrpc_error_response(request.id, -32000, &format!("Error loading edges: {e}"))
}
};
socket.write_all(response.as_bytes())?;
}
2022-09-12 09:38:47 +00:00
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
2022-09-17 14:46:05 +00:00
compute_transfer(request, e.as_ref(), socket)?;
2022-09-09 10:14:48 +00:00
}
2022-11-08 22:04:34 +00:00
"update_edges" => {
let response = match request.params {
JsonValue::Array(updates) => match update_edges(edges, updates) {
Ok(len) => jsonrpc_response(request.id, len),
Err(e) => jsonrpc_error_response(
request.id,
-32000,
&format!("Error updating edges: {e}"),
),
},
_ => {
jsonrpc_error_response(request.id, -32602, "Invalid arguments: Expected array.")
}
};
socket.write_all(response.as_bytes())?;
}
2022-10-28 14:36:14 +00:00
_ => socket
.write_all(jsonrpc_error_response(request.id, -32601, "Method not found").as_bytes())?,
2022-09-12 09:38:47 +00:00
};
Ok(())
2022-09-03 21:54:22 +00:00
}
2022-12-06 21:16:36 +00:00
fn load_edges_binary(edges: &RwLock<Arc<EdgeDB>>, file: &String) -> Result<usize, Box<dyn Error>> {
2022-10-28 14:36:14 +00:00
let updated_edges = read_edges_binary(file)?;
2022-12-06 21:16:36 +00:00
let len = updated_edges.edge_count();
2022-10-28 20:39:47 +00:00
*edges.write().unwrap() = Arc::new(updated_edges);
Ok(len)
}
2022-12-06 21:16:36 +00:00
fn load_edges_csv(edges: &RwLock<Arc<EdgeDB>>, file: &String) -> Result<usize, Box<dyn Error>> {
2022-10-28 20:39:47 +00:00
let updated_edges = read_edges_csv(file)?;
2022-12-06 21:16:36 +00:00
let len = updated_edges.edge_count();
2022-10-28 14:36:14 +00:00
*edges.write().unwrap() = Arc::new(updated_edges);
Ok(len)
}
fn load_safes_binary(edges: &RwLock<Arc<EdgeDB>>, file: &str) -> Result<usize, Box<dyn Error>> {
let updated_edges = import_from_safes_binary(file)?.edges().clone();
let len = updated_edges.edge_count();
*edges.write().unwrap() = Arc::new(updated_edges);
Ok(len)
}
2022-09-17 14:46:05 +00:00
fn compute_transfer(
request: JsonRpcRequest,
2022-12-06 21:16:36 +00:00
edges: &EdgeDB,
2022-09-17 14:46:05 +00:00
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
socket.write_all(chunked_header().as_bytes())?;
let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
vec![Some(1), Some(2), None]
} else {
vec![None]
};
2022-12-21 15:56:37 +00:00
let max_transfers = request.params["max_transfers"].as_u64();
2022-09-17 14:46:05 +00:00
for max_distance in max_distances {
2022-10-08 10:11:26 +00:00
let (flow, transfers) = graph::compute_flow(
2022-09-17 14:46:05 +00:00
&Address::from(request.params["from"].to_string().as_str()),
&Address::from(request.params["to"].to_string().as_str()),
edges,
if request.params.has_key("value") {
U256::from(request.params["value"].to_string().as_str())
} else {
U256::MAX
},
max_distance,
2022-12-21 15:56:37 +00:00
max_transfers,
2022-09-17 14:46:05 +00:00
);
2022-12-06 21:18:55 +00:00
println!("Computed flow with max distance {max_distance:?}: {flow}");
2022-09-17 14:46:05 +00:00
socket.write_all(
chunked_response(
&(jsonrpc_result(
request.id.clone(),
json::object! {
flow: flow.to_decimal(),
2022-09-17 14:46:05 +00:00
final: max_distance.is_none(),
transfers: transfers.into_iter().map(|e| json::object! {
2022-12-24 15:02:30 +00:00
from: e.from.to_checksummed_hex(),
to: e.to.to_checksummed_hex(),
token_owner: e.token.to_checksummed_hex(),
value: e.capacity.to_decimal(),
2022-09-17 14:46:05 +00:00
}).collect::<Vec<_>>(),
},
) + "\r\n"),
)
.as_bytes(),
)?;
}
socket.write_all(chunked_close().as_bytes())?;
Ok(())
}
2022-11-08 22:04:34 +00:00
fn update_edges(
2022-12-06 21:16:36 +00:00
edges: &RwLock<Arc<EdgeDB>>,
2022-11-08 22:04:34 +00:00
updates: Vec<JsonValue>,
) -> Result<usize, Box<dyn Error>> {
let updates = updates
.into_iter()
.map(|e| Edge {
from: Address::from(e["from"].to_string().as_str()),
to: Address::from(e["to"].to_string().as_str()),
token: Address::from(e["token_owner"].to_string().as_str()),
capacity: U256::from(e["capacity"].to_string().as_str()),
})
.collect::<Vec<_>>();
if updates.is_empty() {
2022-12-06 21:16:36 +00:00
return Ok(edges.read().unwrap().edge_count());
2022-11-08 22:04:34 +00:00
}
let mut updating_edges = edges.read().unwrap().as_ref().clone();
for update in updates {
2022-12-06 21:16:36 +00:00
updating_edges.update(update);
2022-11-08 22:04:34 +00:00
}
2022-12-06 21:16:36 +00:00
let len = updating_edges.edge_count();
2022-11-08 22:04:34 +00:00
*edges.write().unwrap() = Arc::new(updating_edges);
Ok(len)
}
2022-09-09 10:14:48 +00:00
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
2022-09-03 21:54:22 +00:00
let payload = read_payload(socket)?;
let mut request = json::parse(&String::from_utf8(payload)?)?;
println!("Request: {request}");
let id = request["id"].take();
let params = request["params"].take();
match request["method"].as_str() {
Some(method) => Ok(JsonRpcRequest {
id,
method: method.to_string(),
params,
}),
_ => Err(From::from("Invalid JSON-RPC request: {request}")),
}
}
2022-09-09 10:14:48 +00:00
fn read_payload(socket: &mut TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
2022-09-03 21:54:22 +00:00
let mut reader = BufReader::new(socket);
let mut length = 0;
for result in reader.by_ref().lines() {
let l = result?;
if l.is_empty() {
break;
}
let header = "content-length: ";
if l.to_lowercase().starts_with(header) {
2022-10-10 07:33:32 +00:00
length = l[header.len()..].parse::<usize>()?;
2022-09-03 21:54:22 +00:00
}
}
let mut payload = vec![0u8; length];
reader.read_exact(payload.as_mut_slice())?;
Ok(payload)
}
2022-09-09 10:14:48 +00:00
2022-09-17 13:46:08 +00:00
fn jsonrpc_response(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
let payload = jsonrpc_result(id, result);
2022-09-09 10:14:48 +00:00
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}
2022-09-17 13:46:08 +00:00
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
json::object! {
jsonrpc: "2.0",
id: id,
result: result.into(),
}
.dump()
}
2022-10-28 14:36:14 +00:00
fn jsonrpc_error_response(id: JsonValue, code: i64, message: &str) -> String {
let payload = json::object! {
jsonrpc: "2.0",
id: id,
error: {
code: code,
message: message
}
}
.dump();
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}
2022-09-17 13:46:08 +00:00
fn chunked_header() -> String {
"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".to_string()
}
fn chunked_response(data: &str) -> String {
if data.is_empty() {
String::new()
} else {
format!("{:x}\r\n{}\r\n", data.len(), data)
}
}
fn chunked_close() -> String {
"0\r\n\r\n".to_string()
}