From 9ee468b04ca97b8b50a860ced513116fba3a031a Mon Sep 17 00:00:00 2001 From: chriseth Date: Tue, 8 Nov 2022 23:04:34 +0100 Subject: [PATCH] Implement update_edges --- src/server.rs | 61 +++++++++++++++++++++++++++++++++++++++++--- src/types/address.rs | 10 ++++---- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/server.rs b/src/server.rs index 1cb1192..e337040 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,15 +4,13 @@ use crate::types::{Address, Edge, U256}; use json::JsonValue; use std::collections::HashMap; use std::error::Error; +use std::io::Read; use std::io::{BufRead, BufReader, Write}; +use std::net::{TcpListener, TcpStream}; use std::ops::Deref; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::thread; -use std::{ - io::Read, - net::{TcpListener, TcpStream}, -}; struct JsonRpcRequest { id: JsonValue, @@ -84,6 +82,22 @@ fn handle_connection( let e = edges.read().unwrap().clone(); compute_transfer(request, e.as_ref(), socket)?; } + "update_edges" => { + let response = match request.params { + JsonValue::Array(updates) => match update_edges(edges, updates) { + Ok(len) => jsonrpc_response(request.id, len), + Err(e) => jsonrpc_error_response( + request.id, + -32000, + &format!("Error updating edges: {e}"), + ), + }, + _ => { + jsonrpc_error_response(request.id, -32602, "Invalid arguments: Expected array.") + } + }; + socket.write_all(response.as_bytes())?; + } _ => socket .write_all(jsonrpc_error_response(request.id, -32601, "Method not found").as_bytes())?, }; @@ -160,6 +174,45 @@ fn compute_transfer( Ok(()) } +fn update_edges( + edges: &RwLock>>>, + updates: Vec, +) -> Result> { + let updates = updates + .into_iter() + .map(|e| Edge { + from: Address::from(e["from"].to_string().as_str()), + to: Address::from(e["to"].to_string().as_str()), + token: Address::from(e["token_owner"].to_string().as_str()), + capacity: U256::from(e["capacity"].to_string().as_str()), + }) + .collect::>(); + if updates.is_empty() { + return Ok(edges.read().unwrap().len()); + } + + let mut updating_edges = edges.read().unwrap().as_ref().clone(); + for update in updates { + let out_edges = updating_edges.entry(update.from).or_default(); + if update.capacity == U256::from(0) { + out_edges.retain(|e| { + !(e.from == update.from && e.to == update.to && e.token == update.token) + }); + } else { + match out_edges + .iter_mut() + .find(|e| e.from == update.from && e.to == update.to && e.token == update.token) + { + Some(e) => e.capacity = update.capacity, + _ => out_edges.push(update), + } + } + } + let len = updating_edges.len(); + *edges.write().unwrap() = Arc::new(updating_edges); + Ok(len) +} + fn read_request(socket: &mut TcpStream) -> Result> { let payload = read_payload(socket)?; let mut request = json::parse(&String::from_utf8(payload)?)?; diff --git a/src/types/address.rs b/src/types/address.rs index 127701d..23f4082 100644 --- a/src/types/address.rs +++ b/src/types/address.rs @@ -23,12 +23,12 @@ impl From<[u8; 20]> for Address { impl From<&str> for Address { fn from(item: &str) -> Self { - assert!(item.starts_with("0x")); - assert!(item.len() == 2 + 20 * 2); + let item = item.strip_prefix("0x").unwrap_or(item); + assert!(item.len() == 20 * 2); let mut data = [0u8; 20]; - for i in (2..item.len()).step_by(2) { - data[i / 2 - 1] = u8::from_str_radix(&item[i..i + 2], 16).unwrap(); - } + data.iter_mut().enumerate().for_each(|(i, b)| { + *b = u8::from_str_radix(&item[2 * i..2 * i + 2], 16).unwrap(); + }); Address(data) } }