diff --git a/src/flow/flow.rs b/src/flow/flow.rs
index f8c3596..bd3838d 100644
--- a/src/flow/flow.rs
+++ b/src/flow/flow.rs
@@ -2,14 +2,15 @@ use crate::flow::adjacencies::Adjacencies;
use crate::flow::{node_as_address, node_as_token_edge, Node};
use crate::types::{Address, Edge, U256};
use std::cmp::min;
-use std::collections::HashMap;
use std::collections::VecDeque;
+use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write;
pub fn compute_flow(
source: &Address,
sink: &Address,
edges: &HashMap
>,
+ requested_flow: U256,
max_distance: Option,
) -> (U256, Vec) {
let mut adjacencies = Adjacencies::new(edges);
@@ -45,9 +46,13 @@ pub fn compute_flow(
}
}
- // TODO prune
-
println!("Max flow: {flow}");
+
+ if flow > requested_flow {
+ let still_to_prune = prune_flow(source, sink, flow - requested_flow, &mut used_edges);
+ flow = requested_flow + still_to_prune;
+ }
+
let transfers = if flow == U256::from(0) {
vec![]
} else {
@@ -137,6 +142,233 @@ fn to_dot(
out
}
+fn prune_flow(
+ source: &Address,
+ sink: &Address,
+ mut flow_to_prune: U256,
+ used_edges: &mut HashMap>,
+) -> U256 {
+ // Note the path length is negative to sort by longest shortest path first.
+ let edges_by_path_length = compute_edges_by_path_length(source, sink, used_edges);
+
+ for edges_here in edges_by_path_length.values() {
+ //println!("Shorter path.");
+ // As long as `edges` contain an edge with smaller weight than the weight still to prune:
+ // take the smallest such edge and prune it.
+ while flow_to_prune > U256::from(0) && !edges_here.is_empty() {
+ //println!("Still to prune: {}", flow_to_prune);
+ if let Some((s, t)) = smallest_edge_in_set(used_edges, edges_here) {
+ if used_edges[&s][&t] > flow_to_prune {
+ break;
+ };
+ flow_to_prune = prune_edge(used_edges, (&s, &t), flow_to_prune);
+ } else {
+ break;
+ }
+ }
+ }
+ // If there is still flow to prune, take the first element in edgesByPathLength
+ // and partially prune its path.
+ if flow_to_prune > U256::from(0) {
+ //println!("Final stage: Still to prune: {}", flow_to_prune);
+ for edges_here in edges_by_path_length.values() {
+ for (a, b) in edges_here {
+ if !used_edges.contains_key(a) || !used_edges[a].contains_key(b) {
+ continue;
+ }
+ flow_to_prune = prune_edge(used_edges, (a, b), flow_to_prune);
+ if flow_to_prune == U256::from(0) {
+ return U256::from(0);
+ }
+ }
+ if flow_to_prune == U256::from(0) {
+ return U256::from(0);
+ }
+ }
+ }
+ flow_to_prune
+}
+
+/// Returns a map from the negative shortest path length to the edge.
+/// The shortest path length is negative so that it is sorted by
+/// longest paths first - those are the ones we want to eliminate first.
+fn compute_edges_by_path_length(
+ source: &Address,
+ sink: &Address,
+ used_edges: &HashMap>,
+) -> BTreeMap> {
+ let mut result = BTreeMap::>::new();
+ let from_source = distance_from_source(&Node::Node(*source), used_edges);
+ let to_sink = distance_to_sink(&Node::Node(*sink), used_edges);
+ for (s, edges) in used_edges {
+ for t in edges.keys() {
+ let path_length = from_source[s] + 1 + to_sink[t];
+ result
+ .entry(-path_length)
+ .or_default()
+ .insert((s.clone(), t.clone()));
+ }
+ }
+ result
+}
+
+fn distance_from_source(
+ source: &Node,
+ used_edges: &HashMap>,
+) -> HashMap {
+ let mut distances = HashMap::::new();
+ let mut to_process = VecDeque::::new();
+ distances.insert(source.clone(), 0);
+ to_process.push_back(source.clone());
+
+ while let Some(n) = to_process.pop_front() {
+ for (t, capacity) in used_edges.get(&n).unwrap_or(&HashMap::new()) {
+ if *capacity > U256::from(0) && !distances.contains_key(t) {
+ distances.insert(t.clone(), distances[&n] + 1);
+ to_process.push_back(t.clone());
+ }
+ }
+ }
+
+ distances
+}
+
+fn distance_to_sink(
+ sink: &Node,
+ used_edges: &HashMap>,
+) -> HashMap {
+ distance_from_source(sink, &reverse_edges(used_edges))
+}
+
+fn reverse_edges(
+ used_edges: &HashMap>,
+) -> HashMap> {
+ let mut reversed: HashMap> = HashMap::new();
+ for (n, edges) in used_edges {
+ for (t, capacity) in edges {
+ reversed
+ .entry(t.clone())
+ .or_default()
+ .insert(n.clone(), *capacity);
+ }
+ }
+ reversed
+}
+
+fn smallest_edge_in_set(
+ all_edges: &HashMap>,
+ edge_set: &HashSet<(Node, Node)>,
+) -> Option<(Node, Node)> {
+ if let Some((a, b, _)) = edge_set
+ .iter()
+ .map(|(a, b)| {
+ let capacity = if let Some(out) = all_edges.get(a) {
+ if let Some(capacity) = out.get(b) {
+ assert!(*capacity != U256::from(0));
+ Some(capacity)
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+ (a, b, capacity)
+ })
+ .filter(|(_, _, capacity)| capacity.is_some())
+ .min_by_key(|(_, _, capacity)| capacity.unwrap())
+ {
+ Some((a.clone(), b.clone()))
+ } else {
+ None
+ }
+}
+
+fn smallest_edge_from(
+ used_edges: &HashMap>,
+ n: &Node,
+) -> Option<(Node, U256)> {
+ used_edges
+ .get(n)
+ .and_then(|out| {
+ out.iter()
+ .min_by_key(|(_, c)| {
+ assert!(**c != U256::from(0));
+ *c
+ })
+ .map(|(t, c)| (t.clone(), *c))
+ })
+}
+
+fn smallest_edge_to(
+ used_edges: &HashMap>,
+ n: &Node,
+) -> Option<(Node, U256)> {
+ used_edges
+ .iter()
+ .filter(|(_, out)| out.contains_key(n))
+ .map(|(t, out)| (t, out[n]))
+ .min_by_key(|(_, c)| {
+ assert!(*c != U256::from(0));
+ *c
+ })
+ .map(|(t, c)| (t.clone(), c))
+}
+
+/// Removes the edge (potentially partially), removing a given amount of flow.
+/// Returns the remaining flow to prune if the edge was too small.
+fn prune_edge(
+ used_edges: &mut HashMap>,
+ edge: (&Node, &Node),
+ flow_to_prune: U256,
+) -> U256 {
+ let edge_size = min(flow_to_prune, used_edges[edge.0][edge.1]);
+ reduce_capacity(used_edges, edge, &edge_size);
+ prune_path(used_edges, edge.1, edge_size, PruneDirection::Forwards);
+ prune_path(used_edges, edge.0, edge_size, PruneDirection::Backwards);
+ flow_to_prune - edge_size
+}
+
+fn reduce_capacity(
+ used_edges: &mut HashMap>,
+ (a, b): (&Node, &Node),
+ reduction: &U256,
+) {
+ let out_edges = used_edges.get_mut(a).unwrap();
+ *out_edges.get_mut(b).unwrap() -= *reduction;
+ if out_edges[b] == U256::from(0) {
+ out_edges.remove_entry(b);
+ }
+}
+
+#[derive(Clone, Copy)]
+enum PruneDirection {
+ Forwards,
+ Backwards,
+}
+
+fn prune_path(
+ used_edges: &mut HashMap>,
+ n: &Node,
+ mut flow_to_prune: U256,
+ direction: PruneDirection,
+) {
+ while let Some((next, mut capacity)) = match direction {
+ PruneDirection::Forwards => smallest_edge_from(used_edges, n),
+ PruneDirection::Backwards => smallest_edge_to(used_edges, n),
+ } {
+ capacity = min(flow_to_prune, capacity);
+ match direction {
+ PruneDirection::Forwards => reduce_capacity(used_edges, (n, &next), &capacity),
+ PruneDirection::Backwards => reduce_capacity(used_edges, (&next, n), &capacity),
+ };
+ prune_path(used_edges, &next, capacity, direction);
+ flow_to_prune -= capacity;
+ if flow_to_prune == U256::from(0) {
+ return;
+ }
+ }
+}
+
fn extract_transfers(
source: &Address,
sink: &Address,
@@ -227,7 +459,7 @@ mod test {
token: t,
capacity: U256::from(10),
}]);
- let flow = compute_flow(&a, &b, &edges, None);
+ let flow = compute_flow(&a, &b, &edges, U256::MAX, None);
assert_eq!(flow, (U256::from(10), edges[&a].clone()));
}
@@ -248,7 +480,7 @@ mod test {
capacity: U256::from(8),
},
]);
- let flow = compute_flow(&a, &c, &edges, None);
+ let flow = compute_flow(&a, &c, &edges, U256::MAX, None);
assert_eq!(
flow,
(
@@ -300,7 +532,7 @@ mod test {
capacity: U256::from(8),
},
]);
- let mut flow = compute_flow(&a, &d, &edges, None);
+ let mut flow = compute_flow(&a, &d, &edges, U256::MAX, None);
flow.1.sort();
assert_eq!(
flow,
@@ -334,5 +566,27 @@ mod test {
]
)
);
+ let mut pruned_flow = compute_flow(&a, &d, &edges, U256::from(6), None);
+ pruned_flow.1.sort();
+ assert_eq!(
+ pruned_flow,
+ (
+ U256::from(6),
+ vec![
+ Edge {
+ from: a,
+ to: b,
+ token: t1,
+ capacity: U256::from(6)
+ },
+ Edge {
+ from: b,
+ to: d,
+ token: t2,
+ capacity: U256::from(6)
+ },
+ ]
+ )
+ );
}
}
diff --git a/src/server.rs b/src/server.rs
index 3993e95..7218206 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,6 +1,6 @@
use crate::flow;
use crate::io::read_edges_binary;
-use crate::types::{Address, Edge};
+use crate::types::{Address, Edge, U256};
use json::JsonValue;
use std::collections::HashMap;
use std::error::Error;
@@ -61,46 +61,7 @@ fn handle_connection(
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
-
- socket.write_all(chunked_header().as_bytes())?;
- let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
- vec![Some(1), Some(2), None]
- } else {
- vec![None]
- };
- for max_distance in max_distances {
- let (flow, transfers) = flow::compute_flow(
- &Address::from(request.params["from"].to_string().as_str()),
- &Address::from(request.params["to"].to_string().as_str()),
- //&U256::from(request.params["value"].to_string().as_str()),
- e.as_ref(),
- max_distance,
- );
- println!(
- "Computed flow with max distance {:?}: {}",
- max_distance, flow
- );
- // TODO error handling
- socket.write_all(
- chunked_response(
- &(jsonrpc_result(
- request.id.clone(),
- json::object! {
- flow: flow.to_string(),
- final: max_distance.is_none(),
- transfers: transfers.into_iter().map(|e| json::object! {
- from: e.from.to_string(),
- to: e.to.to_string(),
- token: e.token.to_string(),
- value: e.capacity.to_string()
- }).collect::>(),
- },
- ) + "\r\n"),
- )
- .as_bytes(),
- )?;
- }
- socket.write_all(chunked_close().as_bytes())?;
+ compute_transfer(request, e.as_ref(), socket)?;
}
"cancel" => {}
"update_edges" => {}
@@ -110,6 +71,57 @@ fn handle_connection(
Ok(())
}
+fn compute_transfer(
+ request: JsonRpcRequest,
+ edges: &HashMap>,
+ mut socket: TcpStream,
+) -> Result<(), Box> {
+ socket.write_all(chunked_header().as_bytes())?;
+ let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
+ vec![Some(1), Some(2), None]
+ } else {
+ vec![None]
+ };
+ for max_distance in max_distances {
+ let (flow, transfers) = flow::compute_flow(
+ &Address::from(request.params["from"].to_string().as_str()),
+ &Address::from(request.params["to"].to_string().as_str()),
+ edges,
+ if request.params.has_key("value") {
+ U256::from(request.params["value"].to_string().as_str())
+ } else {
+ U256::MAX
+ },
+ max_distance,
+ );
+ println!(
+ "Computed flow with max distance {:?}: {}",
+ max_distance, flow
+ );
+ // TODO error handling
+ socket.write_all(
+ chunked_response(
+ &(jsonrpc_result(
+ request.id.clone(),
+ json::object! {
+ flow: flow.to_string(),
+ final: max_distance.is_none(),
+ transfers: transfers.into_iter().map(|e| json::object! {
+ from: e.from.to_string(),
+ to: e.to.to_string(),
+ token: e.token.to_string(),
+ value: e.capacity.to_string()
+ }).collect::>(),
+ },
+ ) + "\r\n"),
+ )
+ .as_bytes(),
+ )?;
+ }
+ socket.write_all(chunked_close().as_bytes())?;
+ Ok(())
+}
+
fn read_request(socket: &mut TcpStream) -> Result> {
let payload = read_payload(socket)?;
let mut request = json::parse(&String::from_utf8(payload)?)?;
diff --git a/src/types/u256.rs b/src/types/u256.rs
index 27f07ad..bb6ca15 100644
--- a/src/types/u256.rs
+++ b/src/types/u256.rs
@@ -6,9 +6,10 @@ use std::ops::{Add, AddAssign, Neg, Sub, SubAssign};
pub struct U256([u128; 2]);
impl U256 {
- pub fn new(high: u128, low: u128) -> U256 {
+ pub const fn new(high: u128, low: u128) -> U256 {
U256([high, low])
}
+ pub const MAX: U256 = U256::new(u128::MAX, u128::MAX);
}
impl From for U256 {
diff --git a/tests/integration.rs b/tests/integration.rs
index 80dbd2b..a5eb7d5 100644
--- a/tests/integration.rs
+++ b/tests/integration.rs
@@ -1,16 +1,17 @@
-use pathfinder2::flow::compute_flow;
-use pathfinder2::io::read_edges_binary;
-use pathfinder2::types::Address;
+// use pathfinder2::flow::compute_flow;
+// use pathfinder2::io::read_edges_binary;
+// use pathfinder2::types::Address;
-#[test]
-fn test_flow() {
- let edges = read_edges_binary(&"edges.dat".to_string()).unwrap();
- let transfers = compute_flow(
- &Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"),
- &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
- //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
- &edges,
- None,
- );
- println!("{:?}", transfers);
-}
+// #[test]
+// fn test_flow() {
+// let edges = read_edges_binary(&"edges.dat".to_string()).unwrap();
+// let transfers = compute_flow(
+// &Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"),
+// &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
+// //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
+// &edges,
+// U256::MAX,
+// None,
+// );
+// println!("{:?}", transfers);
+// }