Implement update_edges
This commit is contained in:
parent
03f0ff8d38
commit
9ee468b04c
2 changed files with 62 additions and 9 deletions
|
@ -4,15 +4,13 @@ use crate::types::{Address, Edge, U256};
|
||||||
use json::JsonValue;
|
use json::JsonValue;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use std::io::Read;
|
||||||
use std::io::{BufRead, BufReader, Write};
|
use std::io::{BufRead, BufReader, Write};
|
||||||
|
use std::net::{TcpListener, TcpStream};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::mpsc::TrySendError;
|
use std::sync::mpsc::TrySendError;
|
||||||
use std::sync::{mpsc, Arc, Mutex, RwLock};
|
use std::sync::{mpsc, Arc, Mutex, RwLock};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::{
|
|
||||||
io::Read,
|
|
||||||
net::{TcpListener, TcpStream},
|
|
||||||
};
|
|
||||||
|
|
||||||
struct JsonRpcRequest {
|
struct JsonRpcRequest {
|
||||||
id: JsonValue,
|
id: JsonValue,
|
||||||
|
@ -84,6 +82,22 @@ fn handle_connection(
|
||||||
let e = edges.read().unwrap().clone();
|
let e = edges.read().unwrap().clone();
|
||||||
compute_transfer(request, e.as_ref(), socket)?;
|
compute_transfer(request, e.as_ref(), socket)?;
|
||||||
}
|
}
|
||||||
|
"update_edges" => {
|
||||||
|
let response = match request.params {
|
||||||
|
JsonValue::Array(updates) => match update_edges(edges, updates) {
|
||||||
|
Ok(len) => jsonrpc_response(request.id, len),
|
||||||
|
Err(e) => jsonrpc_error_response(
|
||||||
|
request.id,
|
||||||
|
-32000,
|
||||||
|
&format!("Error updating edges: {e}"),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
jsonrpc_error_response(request.id, -32602, "Invalid arguments: Expected array.")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
socket.write_all(response.as_bytes())?;
|
||||||
|
}
|
||||||
_ => socket
|
_ => socket
|
||||||
.write_all(jsonrpc_error_response(request.id, -32601, "Method not found").as_bytes())?,
|
.write_all(jsonrpc_error_response(request.id, -32601, "Method not found").as_bytes())?,
|
||||||
};
|
};
|
||||||
|
@ -160,6 +174,45 @@ fn compute_transfer(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_edges(
|
||||||
|
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
|
||||||
|
updates: Vec<JsonValue>,
|
||||||
|
) -> Result<usize, Box<dyn Error>> {
|
||||||
|
let updates = updates
|
||||||
|
.into_iter()
|
||||||
|
.map(|e| Edge {
|
||||||
|
from: Address::from(e["from"].to_string().as_str()),
|
||||||
|
to: Address::from(e["to"].to_string().as_str()),
|
||||||
|
token: Address::from(e["token_owner"].to_string().as_str()),
|
||||||
|
capacity: U256::from(e["capacity"].to_string().as_str()),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if updates.is_empty() {
|
||||||
|
return Ok(edges.read().unwrap().len());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut updating_edges = edges.read().unwrap().as_ref().clone();
|
||||||
|
for update in updates {
|
||||||
|
let out_edges = updating_edges.entry(update.from).or_default();
|
||||||
|
if update.capacity == U256::from(0) {
|
||||||
|
out_edges.retain(|e| {
|
||||||
|
!(e.from == update.from && e.to == update.to && e.token == update.token)
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
match out_edges
|
||||||
|
.iter_mut()
|
||||||
|
.find(|e| e.from == update.from && e.to == update.to && e.token == update.token)
|
||||||
|
{
|
||||||
|
Some(e) => e.capacity = update.capacity,
|
||||||
|
_ => out_edges.push(update),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let len = updating_edges.len();
|
||||||
|
*edges.write().unwrap() = Arc::new(updating_edges);
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
|
||||||
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
|
fn read_request(socket: &mut TcpStream) -> Result<JsonRpcRequest, Box<dyn Error>> {
|
||||||
let payload = read_payload(socket)?;
|
let payload = read_payload(socket)?;
|
||||||
let mut request = json::parse(&String::from_utf8(payload)?)?;
|
let mut request = json::parse(&String::from_utf8(payload)?)?;
|
||||||
|
|
|
@ -23,12 +23,12 @@ impl From<[u8; 20]> for Address {
|
||||||
|
|
||||||
impl From<&str> for Address {
|
impl From<&str> for Address {
|
||||||
fn from(item: &str) -> Self {
|
fn from(item: &str) -> Self {
|
||||||
assert!(item.starts_with("0x"));
|
let item = item.strip_prefix("0x").unwrap_or(item);
|
||||||
assert!(item.len() == 2 + 20 * 2);
|
assert!(item.len() == 20 * 2);
|
||||||
let mut data = [0u8; 20];
|
let mut data = [0u8; 20];
|
||||||
for i in (2..item.len()).step_by(2) {
|
data.iter_mut().enumerate().for_each(|(i, b)| {
|
||||||
data[i / 2 - 1] = u8::from_str_radix(&item[i..i + 2], 16).unwrap();
|
*b = u8::from_str_radix(&item[2 * i..2 * i + 2], 16).unwrap();
|
||||||
}
|
});
|
||||||
Address(data)
|
Address(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue