diff --git a/src/bin/cli.rs b/src/bin/cli.rs index e27c7d3..d18cdcb 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -58,7 +58,7 @@ fn main() { io::read_edges_binary(edges_file) }) .unwrap_or_else(|_| panic!("Error loading edges from file \"{edges_file}\".")); - println!("Read {} edges", edges.len()); + println!("Read {} edges", edges.edge_count()); let (flow, transfers) = graph::compute_flow( &Address::from(from_str.as_str()), &Address::from(to_str.as_str()), diff --git a/src/graph/adjacencies.rs b/src/graph/adjacencies.rs index 6e000d8..13fc32c 100644 --- a/src/graph/adjacencies.rs +++ b/src/graph/adjacencies.rs @@ -1,10 +1,11 @@ use crate::graph::Node; +use crate::types::edge::EdgeDB; use crate::types::{Address, Edge, U256}; use std::cmp::Reverse; use std::collections::HashMap; pub struct Adjacencies<'a> { - edges: &'a HashMap>, + edges: &'a EdgeDB, lazy_adjacencies: HashMap>, capacity_adjustments: HashMap>, } @@ -21,7 +22,7 @@ fn source_address_of(node: &Node) -> &Address { } impl<'a> Adjacencies<'a> { - pub fn new(edges: &'a HashMap>) -> Self { + pub fn new(edges: &'a EdgeDB) -> Self { Adjacencies { edges, lazy_adjacencies: HashMap::new(), @@ -69,7 +70,7 @@ impl<'a> Adjacencies<'a> { .or_insert_with(|| { let mut result: HashMap = HashMap::new(); // Plain edges are (from, to, token) labeled with capacity - for edge in self.edges.get(source_address_of(from)).unwrap_or(&vec![]) { + for edge in self.edges.outgoing(source_address_of(from)) { match from { Node::Node(_) => { // One edge from "from" to "from x token" with a capacity diff --git a/src/graph/flow.rs b/src/graph/flow.rs index 572eea8..d14ef67 100644 --- a/src/graph/flow.rs +++ b/src/graph/flow.rs @@ -1,5 +1,6 @@ use crate::graph::adjacencies::Adjacencies; use crate::graph::{node_as_address, node_as_token_edge, Node}; +use crate::types::edge::EdgeDB; use crate::types::{Address, Edge, U256}; use std::cmp::min; use std::collections::VecDeque; @@ -9,7 +10,7 @@ use std::fmt::Write; pub fn compute_flow( source: &Address, sink: &Address, - edges: &HashMap>, + edges: &EdgeDB, requested_flow: U256, max_distance: Option, ) -> (U256, Vec) { @@ -478,12 +479,8 @@ mod test { Address::from("0x66c16ce62d26fd51582a646e2e30a3267b1e6d7e"), ) } - fn build_edges(input: Vec) -> HashMap> { - let mut output: HashMap> = HashMap::new(); - for e in input { - output.entry(e.from).or_default().push(e); - } - output + fn build_edges(input: Vec) -> EdgeDB { + EdgeDB::new(input) } #[test] @@ -496,7 +493,18 @@ mod test { capacity: U256::from(10), }]); let flow = compute_flow(&a, &b, &edges, U256::MAX, None); - assert_eq!(flow, (U256::from(10), edges[&a].clone())); + assert_eq!( + flow, + ( + U256::from(10), + vec![Edge { + from: a, + to: b, + token: t, + capacity: U256::from(10) + }] + ) + ); } #[test] diff --git a/src/io.rs b/src/io.rs index 1b3e7f3..a7ca8fe 100644 --- a/src/io.rs +++ b/src/io.rs @@ -3,16 +3,17 @@ use std::io::Read; use std::io::{self, BufRead}; use std::{collections::HashMap, io::BufReader}; +use crate::types::edge::EdgeDB; use crate::types::{Address, Edge, U256}; -pub fn read_edges_binary(path: &String) -> Result>, io::Error> { +pub fn read_edges_binary(path: &String) -> Result { let mut f = File::open(path)?; let address_index = read_address_index(&mut f)?; read_edges(&mut f, &address_index) } -pub fn read_edges_csv(path: &String) -> Result>, io::Error> { - let mut result = HashMap::>::new(); +pub fn read_edges_csv(path: &String) -> Result { + let mut edges = Vec::new(); let f = BufReader::new(File::open(path)?); for line in f.lines() { let line = line?; @@ -23,7 +24,7 @@ pub fn read_edges_csv(path: &String) -> Result>, io:: let to = Address::from(unescape(to)); let token = Address::from(unescape(token)); let capacity = U256::from(unescape(capacity)); - result.entry(from).or_default().push(Edge { + edges.push(Edge { from, to, token, @@ -38,7 +39,7 @@ pub fn read_edges_csv(path: &String) -> Result>, io:: } } } - Ok(result) + Ok(EdgeDB::new(edges)) } fn read_address_index(file: &mut File) -> Result, io::Error> { @@ -81,25 +82,22 @@ fn read_u256(file: &mut File) -> Result { Ok(U256::new(high, low)) } -fn read_edges( - file: &mut File, - address_index: &HashMap, -) -> Result>, io::Error> { +fn read_edges(file: &mut File, address_index: &HashMap) -> Result { let edge_count = read_u32(file)?; - let mut edges: HashMap> = HashMap::new(); + let mut edges = Vec::new(); for _i in 0..edge_count { let from = read_address(file, address_index)?; let to = read_address(file, address_index)?; let token = read_address(file, address_index)?; let capacity = read_u256(file)?; - edges.entry(from).or_insert(vec![]).push(Edge { + edges.push(Edge { from, to, token, capacity, }); } - Ok(edges) + Ok(EdgeDB::new(edges)) } fn unescape(input: &str) -> &str { diff --git a/src/server.rs b/src/server.rs index e337040..074af41 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,8 @@ use crate::graph; use crate::io::{read_edges_binary, read_edges_csv}; +use crate::types::edge::EdgeDB; use crate::types::{Address, Edge, U256}; use json::JsonValue; -use std::collections::HashMap; use std::error::Error; use std::io::Read; use std::io::{BufRead, BufReader, Write}; @@ -18,10 +18,8 @@ struct JsonRpcRequest { params: JsonValue, } -type EdgeMap = HashMap>; - pub fn start_server(port: u16, queue_size: usize, threads: u64) { - let edges: Arc>> = Arc::new(RwLock::new(Arc::new(HashMap::new()))); + let edges: Arc>> = Arc::new(RwLock::new(Arc::new(EdgeDB::default()))); let (sender, receiver) = mpsc::sync_channel(queue_size); let protected_receiver = Arc::new(Mutex::new(receiver)); @@ -54,7 +52,7 @@ pub fn start_server(port: u16, queue_size: usize, threads: u64) { } fn handle_connection( - edges: &RwLock>>>, + edges: &RwLock>, mut socket: TcpStream, ) -> Result<(), Box> { let request = read_request(&mut socket)?; @@ -104,29 +102,23 @@ fn handle_connection( Ok(()) } -fn load_edges_binary( - edges: &RwLock>>>, - file: &String, -) -> Result> { +fn load_edges_binary(edges: &RwLock>, file: &String) -> Result> { let updated_edges = read_edges_binary(file)?; - let len = updated_edges.len(); + let len = updated_edges.edge_count(); *edges.write().unwrap() = Arc::new(updated_edges); Ok(len) } -fn load_edges_csv( - edges: &RwLock>>>, - file: &String, -) -> Result> { +fn load_edges_csv(edges: &RwLock>, file: &String) -> Result> { let updated_edges = read_edges_csv(file)?; - let len = updated_edges.len(); + let len = updated_edges.edge_count(); *edges.write().unwrap() = Arc::new(updated_edges); Ok(len) } fn compute_transfer( request: JsonRpcRequest, - edges: &HashMap>, + edges: &EdgeDB, mut socket: TcpStream, ) -> Result<(), Box> { socket.write_all(chunked_header().as_bytes())?; @@ -175,7 +167,7 @@ fn compute_transfer( } fn update_edges( - edges: &RwLock>>>, + edges: &RwLock>, updates: Vec, ) -> Result> { let updates = updates @@ -188,27 +180,14 @@ fn update_edges( }) .collect::>(); if updates.is_empty() { - return Ok(edges.read().unwrap().len()); + return Ok(edges.read().unwrap().edge_count()); } let mut updating_edges = edges.read().unwrap().as_ref().clone(); for update in updates { - let out_edges = updating_edges.entry(update.from).or_default(); - if update.capacity == U256::from(0) { - out_edges.retain(|e| { - !(e.from == update.from && e.to == update.to && e.token == update.token) - }); - } else { - match out_edges - .iter_mut() - .find(|e| e.from == update.from && e.to == update.to && e.token == update.token) - { - Some(e) => e.capacity = update.capacity, - _ => out_edges.push(update), - } - } + updating_edges.update(update); } - let len = updating_edges.len(); + let len = updating_edges.edge_count(); *edges.write().unwrap() = Arc::new(updating_edges); Ok(len) } diff --git a/src/types/edge.rs b/src/types/edge.rs index d8c6985..07b332d 100644 --- a/src/types/edge.rs +++ b/src/types/edge.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; + use crate::types::Address; use crate::types::U256; -#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Ord, PartialOrd)] +#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, Ord, PartialOrd)] pub struct Edge { pub from: Address, pub to: Address, @@ -11,5 +13,79 @@ pub struct Edge { // TODO comparison, hash, etc. can ignore the capacity field. -// TODO can we derive it? -impl Eq for Edge {} +pub fn eq_up_to_capacity(e1: &Edge, e2: &Edge) -> bool { + e1.from == e2.from && e1.to == e2.to && e1.token == e2.token +} + +#[derive(Debug, Default, Clone)] +pub struct EdgeDB { + edges: Vec, + outgoing: HashMap>, + incoming: HashMap>, +} + +impl EdgeDB { + pub fn new(edges: Vec) -> EdgeDB { + let outgoing = outgoing_index(&edges); + let incoming = incoming_index(&edges); + EdgeDB { + edges, + outgoing, + incoming, + } + } + + pub fn edge_count(&self) -> usize { + self.edges.len() + } + + pub fn update(&mut self, update: Edge) { + match self.index_of(&update) { + Some(i) => self.edges[i].capacity = update.capacity, + None => { + let i = self.edges.len(); + self.outgoing.entry(update.from).or_default().push(i); + self.incoming.entry(update.to).or_default().push(i); + self.edges.push(update); + } + } + } + + pub fn outgoing(&self, source: &Address) -> Vec<&Edge> { + match self.outgoing.get(source) { + Some(out) => out + .iter() + .map(|i| self.edges.get(*i).unwrap()) + .filter(|e| e.capacity != U256::from(0)) + .collect(), + None => vec![], + } + } + + fn index_of(&self, e: &Edge) -> Option { + self.outgoing.get(&e.from).and_then(|out| { + for i in out { + if eq_up_to_capacity(&self.edges[*i], e) { + return Some(*i); + } + } + None + }) + } +} + +fn outgoing_index(edges: &[Edge]) -> HashMap> { + let mut index: HashMap> = HashMap::new(); + for (i, e) in edges.iter().enumerate() { + index.entry(e.from).or_default().push(i) + } + index +} + +fn incoming_index(edges: &[Edge]) -> HashMap> { + let mut index: HashMap> = HashMap::new(); + for (i, e) in edges.iter().enumerate() { + index.entry(e.to).or_default().push(i) + } + index +} diff --git a/tests/integration.rs b/tests/integration.rs index e2307db..a29f8fd 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,5 +1,6 @@ use pathfinder2::graph::compute_flow; use pathfinder2::io::read_edges_binary; +use pathfinder2::types::edge::EdgeDB; use pathfinder2::types::{Address, Edge, U256}; use std::collections::HashMap; use std::process::Command; @@ -51,14 +52,14 @@ fn test_flow_large() { ); } -fn read_edges() -> HashMap> { +fn read_edges() -> EdgeDB { read_edges_binary(&"edges.dat".to_string()).unwrap() } fn test_flow( source: &Address, sink: &Address, - edges: &HashMap>, + edges: &EdgeDB, requested_flow: U256, max_distance: Option, ) {