From 05a3aaf21b5fbfb8b6cb0c227eee22cbe3d80788 Mon Sep 17 00:00:00 2001 From: chriseth Date: Sat, 17 Sep 2022 16:46:05 +0200 Subject: [PATCH] Pruning. --- src/flow/flow.rs | 266 ++++++++++++++++++++++++++++++++++++++++++- src/server.rs | 94 ++++++++------- src/types/u256.rs | 3 +- tests/integration.rs | 31 ++--- 4 files changed, 331 insertions(+), 63 deletions(-) diff --git a/src/flow/flow.rs b/src/flow/flow.rs index f8c3596..bd3838d 100644 --- a/src/flow/flow.rs +++ b/src/flow/flow.rs @@ -2,14 +2,15 @@ use crate::flow::adjacencies::Adjacencies; use crate::flow::{node_as_address, node_as_token_edge, Node}; use crate::types::{Address, Edge, U256}; use std::cmp::min; -use std::collections::HashMap; use std::collections::VecDeque; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Write; pub fn compute_flow( source: &Address, sink: &Address, edges: &HashMap>, + requested_flow: U256, max_distance: Option, ) -> (U256, Vec) { let mut adjacencies = Adjacencies::new(edges); @@ -45,9 +46,13 @@ pub fn compute_flow( } } - // TODO prune - println!("Max flow: {flow}"); + + if flow > requested_flow { + let still_to_prune = prune_flow(source, sink, flow - requested_flow, &mut used_edges); + flow = requested_flow + still_to_prune; + } + let transfers = if flow == U256::from(0) { vec![] } else { @@ -137,6 +142,233 @@ fn to_dot( out } +fn prune_flow( + source: &Address, + sink: &Address, + mut flow_to_prune: U256, + used_edges: &mut HashMap>, +) -> U256 { + // Note the path length is negative to sort by longest shortest path first. + let edges_by_path_length = compute_edges_by_path_length(source, sink, used_edges); + + for edges_here in edges_by_path_length.values() { + //println!("Shorter path."); + // As long as `edges` contain an edge with smaller weight than the weight still to prune: + // take the smallest such edge and prune it. + while flow_to_prune > U256::from(0) && !edges_here.is_empty() { + //println!("Still to prune: {}", flow_to_prune); + if let Some((s, t)) = smallest_edge_in_set(used_edges, edges_here) { + if used_edges[&s][&t] > flow_to_prune { + break; + }; + flow_to_prune = prune_edge(used_edges, (&s, &t), flow_to_prune); + } else { + break; + } + } + } + // If there is still flow to prune, take the first element in edgesByPathLength + // and partially prune its path. + if flow_to_prune > U256::from(0) { + //println!("Final stage: Still to prune: {}", flow_to_prune); + for edges_here in edges_by_path_length.values() { + for (a, b) in edges_here { + if !used_edges.contains_key(a) || !used_edges[a].contains_key(b) { + continue; + } + flow_to_prune = prune_edge(used_edges, (a, b), flow_to_prune); + if flow_to_prune == U256::from(0) { + return U256::from(0); + } + } + if flow_to_prune == U256::from(0) { + return U256::from(0); + } + } + } + flow_to_prune +} + +/// Returns a map from the negative shortest path length to the edge. +/// The shortest path length is negative so that it is sorted by +/// longest paths first - those are the ones we want to eliminate first. +fn compute_edges_by_path_length( + source: &Address, + sink: &Address, + used_edges: &HashMap>, +) -> BTreeMap> { + let mut result = BTreeMap::>::new(); + let from_source = distance_from_source(&Node::Node(*source), used_edges); + let to_sink = distance_to_sink(&Node::Node(*sink), used_edges); + for (s, edges) in used_edges { + for t in edges.keys() { + let path_length = from_source[s] + 1 + to_sink[t]; + result + .entry(-path_length) + .or_default() + .insert((s.clone(), t.clone())); + } + } + result +} + +fn distance_from_source( + source: &Node, + used_edges: &HashMap>, +) -> HashMap { + let mut distances = HashMap::::new(); + let mut to_process = VecDeque::::new(); + distances.insert(source.clone(), 0); + to_process.push_back(source.clone()); + + while let Some(n) = to_process.pop_front() { + for (t, capacity) in used_edges.get(&n).unwrap_or(&HashMap::new()) { + if *capacity > U256::from(0) && !distances.contains_key(t) { + distances.insert(t.clone(), distances[&n] + 1); + to_process.push_back(t.clone()); + } + } + } + + distances +} + +fn distance_to_sink( + sink: &Node, + used_edges: &HashMap>, +) -> HashMap { + distance_from_source(sink, &reverse_edges(used_edges)) +} + +fn reverse_edges( + used_edges: &HashMap>, +) -> HashMap> { + let mut reversed: HashMap> = HashMap::new(); + for (n, edges) in used_edges { + for (t, capacity) in edges { + reversed + .entry(t.clone()) + .or_default() + .insert(n.clone(), *capacity); + } + } + reversed +} + +fn smallest_edge_in_set( + all_edges: &HashMap>, + edge_set: &HashSet<(Node, Node)>, +) -> Option<(Node, Node)> { + if let Some((a, b, _)) = edge_set + .iter() + .map(|(a, b)| { + let capacity = if let Some(out) = all_edges.get(a) { + if let Some(capacity) = out.get(b) { + assert!(*capacity != U256::from(0)); + Some(capacity) + } else { + None + } + } else { + None + }; + (a, b, capacity) + }) + .filter(|(_, _, capacity)| capacity.is_some()) + .min_by_key(|(_, _, capacity)| capacity.unwrap()) + { + Some((a.clone(), b.clone())) + } else { + None + } +} + +fn smallest_edge_from( + used_edges: &HashMap>, + n: &Node, +) -> Option<(Node, U256)> { + used_edges + .get(n) + .and_then(|out| { + out.iter() + .min_by_key(|(_, c)| { + assert!(**c != U256::from(0)); + *c + }) + .map(|(t, c)| (t.clone(), *c)) + }) +} + +fn smallest_edge_to( + used_edges: &HashMap>, + n: &Node, +) -> Option<(Node, U256)> { + used_edges + .iter() + .filter(|(_, out)| out.contains_key(n)) + .map(|(t, out)| (t, out[n])) + .min_by_key(|(_, c)| { + assert!(*c != U256::from(0)); + *c + }) + .map(|(t, c)| (t.clone(), c)) +} + +/// Removes the edge (potentially partially), removing a given amount of flow. +/// Returns the remaining flow to prune if the edge was too small. +fn prune_edge( + used_edges: &mut HashMap>, + edge: (&Node, &Node), + flow_to_prune: U256, +) -> U256 { + let edge_size = min(flow_to_prune, used_edges[edge.0][edge.1]); + reduce_capacity(used_edges, edge, &edge_size); + prune_path(used_edges, edge.1, edge_size, PruneDirection::Forwards); + prune_path(used_edges, edge.0, edge_size, PruneDirection::Backwards); + flow_to_prune - edge_size +} + +fn reduce_capacity( + used_edges: &mut HashMap>, + (a, b): (&Node, &Node), + reduction: &U256, +) { + let out_edges = used_edges.get_mut(a).unwrap(); + *out_edges.get_mut(b).unwrap() -= *reduction; + if out_edges[b] == U256::from(0) { + out_edges.remove_entry(b); + } +} + +#[derive(Clone, Copy)] +enum PruneDirection { + Forwards, + Backwards, +} + +fn prune_path( + used_edges: &mut HashMap>, + n: &Node, + mut flow_to_prune: U256, + direction: PruneDirection, +) { + while let Some((next, mut capacity)) = match direction { + PruneDirection::Forwards => smallest_edge_from(used_edges, n), + PruneDirection::Backwards => smallest_edge_to(used_edges, n), + } { + capacity = min(flow_to_prune, capacity); + match direction { + PruneDirection::Forwards => reduce_capacity(used_edges, (n, &next), &capacity), + PruneDirection::Backwards => reduce_capacity(used_edges, (&next, n), &capacity), + }; + prune_path(used_edges, &next, capacity, direction); + flow_to_prune -= capacity; + if flow_to_prune == U256::from(0) { + return; + } + } +} + fn extract_transfers( source: &Address, sink: &Address, @@ -227,7 +459,7 @@ mod test { token: t, capacity: U256::from(10), }]); - let flow = compute_flow(&a, &b, &edges, None); + let flow = compute_flow(&a, &b, &edges, U256::MAX, None); assert_eq!(flow, (U256::from(10), edges[&a].clone())); } @@ -248,7 +480,7 @@ mod test { capacity: U256::from(8), }, ]); - let flow = compute_flow(&a, &c, &edges, None); + let flow = compute_flow(&a, &c, &edges, U256::MAX, None); assert_eq!( flow, ( @@ -300,7 +532,7 @@ mod test { capacity: U256::from(8), }, ]); - let mut flow = compute_flow(&a, &d, &edges, None); + let mut flow = compute_flow(&a, &d, &edges, U256::MAX, None); flow.1.sort(); assert_eq!( flow, @@ -334,5 +566,27 @@ mod test { ] ) ); + let mut pruned_flow = compute_flow(&a, &d, &edges, U256::from(6), None); + pruned_flow.1.sort(); + assert_eq!( + pruned_flow, + ( + U256::from(6), + vec![ + Edge { + from: a, + to: b, + token: t1, + capacity: U256::from(6) + }, + Edge { + from: b, + to: d, + token: t2, + capacity: U256::from(6) + }, + ] + ) + ); } } diff --git a/src/server.rs b/src/server.rs index 3993e95..7218206 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,6 @@ use crate::flow; use crate::io::read_edges_binary; -use crate::types::{Address, Edge}; +use crate::types::{Address, Edge, U256}; use json::JsonValue; use std::collections::HashMap; use std::error::Error; @@ -61,46 +61,7 @@ fn handle_connection( "compute_transfer" => { println!("Computing flow"); let e = edges.read().unwrap().clone(); - - socket.write_all(chunked_header().as_bytes())?; - let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() { - vec![Some(1), Some(2), None] - } else { - vec![None] - }; - for max_distance in max_distances { - let (flow, transfers) = flow::compute_flow( - &Address::from(request.params["from"].to_string().as_str()), - &Address::from(request.params["to"].to_string().as_str()), - //&U256::from(request.params["value"].to_string().as_str()), - e.as_ref(), - max_distance, - ); - println!( - "Computed flow with max distance {:?}: {}", - max_distance, flow - ); - // TODO error handling - socket.write_all( - chunked_response( - &(jsonrpc_result( - request.id.clone(), - json::object! { - flow: flow.to_string(), - final: max_distance.is_none(), - transfers: transfers.into_iter().map(|e| json::object! { - from: e.from.to_string(), - to: e.to.to_string(), - token: e.token.to_string(), - value: e.capacity.to_string() - }).collect::>(), - }, - ) + "\r\n"), - ) - .as_bytes(), - )?; - } - socket.write_all(chunked_close().as_bytes())?; + compute_transfer(request, e.as_ref(), socket)?; } "cancel" => {} "update_edges" => {} @@ -110,6 +71,57 @@ fn handle_connection( Ok(()) } +fn compute_transfer( + request: JsonRpcRequest, + edges: &HashMap>, + mut socket: TcpStream, +) -> Result<(), Box> { + socket.write_all(chunked_header().as_bytes())?; + let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() { + vec![Some(1), Some(2), None] + } else { + vec![None] + }; + for max_distance in max_distances { + let (flow, transfers) = flow::compute_flow( + &Address::from(request.params["from"].to_string().as_str()), + &Address::from(request.params["to"].to_string().as_str()), + edges, + if request.params.has_key("value") { + U256::from(request.params["value"].to_string().as_str()) + } else { + U256::MAX + }, + max_distance, + ); + println!( + "Computed flow with max distance {:?}: {}", + max_distance, flow + ); + // TODO error handling + socket.write_all( + chunked_response( + &(jsonrpc_result( + request.id.clone(), + json::object! { + flow: flow.to_string(), + final: max_distance.is_none(), + transfers: transfers.into_iter().map(|e| json::object! { + from: e.from.to_string(), + to: e.to.to_string(), + token: e.token.to_string(), + value: e.capacity.to_string() + }).collect::>(), + }, + ) + "\r\n"), + ) + .as_bytes(), + )?; + } + socket.write_all(chunked_close().as_bytes())?; + Ok(()) +} + fn read_request(socket: &mut TcpStream) -> Result> { let payload = read_payload(socket)?; let mut request = json::parse(&String::from_utf8(payload)?)?; diff --git a/src/types/u256.rs b/src/types/u256.rs index 27f07ad..bb6ca15 100644 --- a/src/types/u256.rs +++ b/src/types/u256.rs @@ -6,9 +6,10 @@ use std::ops::{Add, AddAssign, Neg, Sub, SubAssign}; pub struct U256([u128; 2]); impl U256 { - pub fn new(high: u128, low: u128) -> U256 { + pub const fn new(high: u128, low: u128) -> U256 { U256([high, low]) } + pub const MAX: U256 = U256::new(u128::MAX, u128::MAX); } impl From for U256 { diff --git a/tests/integration.rs b/tests/integration.rs index 80dbd2b..a5eb7d5 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,16 +1,17 @@ -use pathfinder2::flow::compute_flow; -use pathfinder2::io::read_edges_binary; -use pathfinder2::types::Address; +// use pathfinder2::flow::compute_flow; +// use pathfinder2::io::read_edges_binary; +// use pathfinder2::types::Address; -#[test] -fn test_flow() { - let edges = read_edges_binary(&"edges.dat".to_string()).unwrap(); - let transfers = compute_flow( - &Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"), - &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"), - //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"), - &edges, - None, - ); - println!("{:?}", transfers); -} +// #[test] +// fn test_flow() { +// let edges = read_edges_binary(&"edges.dat".to_string()).unwrap(); +// let transfers = compute_flow( +// &Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"), +// &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"), +// //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"), +// &edges, +// U256::MAX, +// None, +// ); +// println!("{:?}", transfers); +// }