This commit is contained in:
chriseth 2022-09-17 16:46:05 +02:00
parent cff2d35231
commit 05a3aaf21b
4 changed files with 331 additions and 63 deletions

View file

@ -2,14 +2,15 @@ use crate::flow::adjacencies::Adjacencies;
use crate::flow::{node_as_address, node_as_token_edge, Node};
use crate::types::{Address, Edge, U256};
use std::cmp::min;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write;
pub fn compute_flow(
source: &Address,
sink: &Address,
edges: &HashMap<Address, Vec<Edge>>,
requested_flow: U256,
max_distance: Option<u64>,
) -> (U256, Vec<Edge>) {
let mut adjacencies = Adjacencies::new(edges);
@ -45,9 +46,13 @@ pub fn compute_flow(
}
}
// TODO prune
println!("Max flow: {flow}");
if flow > requested_flow {
let still_to_prune = prune_flow(source, sink, flow - requested_flow, &mut used_edges);
flow = requested_flow + still_to_prune;
}
let transfers = if flow == U256::from(0) {
vec![]
} else {
@ -137,6 +142,233 @@ fn to_dot(
out
}
fn prune_flow(
source: &Address,
sink: &Address,
mut flow_to_prune: U256,
used_edges: &mut HashMap<Node, HashMap<Node, U256>>,
) -> U256 {
// Note the path length is negative to sort by longest shortest path first.
let edges_by_path_length = compute_edges_by_path_length(source, sink, used_edges);
for edges_here in edges_by_path_length.values() {
//println!("Shorter path.");
// As long as `edges` contain an edge with smaller weight than the weight still to prune:
// take the smallest such edge and prune it.
while flow_to_prune > U256::from(0) && !edges_here.is_empty() {
//println!("Still to prune: {}", flow_to_prune);
if let Some((s, t)) = smallest_edge_in_set(used_edges, edges_here) {
if used_edges[&s][&t] > flow_to_prune {
break;
};
flow_to_prune = prune_edge(used_edges, (&s, &t), flow_to_prune);
} else {
break;
}
}
}
// If there is still flow to prune, take the first element in edgesByPathLength
// and partially prune its path.
if flow_to_prune > U256::from(0) {
//println!("Final stage: Still to prune: {}", flow_to_prune);
for edges_here in edges_by_path_length.values() {
for (a, b) in edges_here {
if !used_edges.contains_key(a) || !used_edges[a].contains_key(b) {
continue;
}
flow_to_prune = prune_edge(used_edges, (a, b), flow_to_prune);
if flow_to_prune == U256::from(0) {
return U256::from(0);
}
}
if flow_to_prune == U256::from(0) {
return U256::from(0);
}
}
}
flow_to_prune
}
/// Returns a map from the negative shortest path length to the edge.
/// The shortest path length is negative so that it is sorted by
/// longest paths first - those are the ones we want to eliminate first.
fn compute_edges_by_path_length(
source: &Address,
sink: &Address,
used_edges: &HashMap<Node, HashMap<Node, U256>>,
) -> BTreeMap<i64, HashSet<(Node, Node)>> {
let mut result = BTreeMap::<i64, HashSet<(Node, Node)>>::new();
let from_source = distance_from_source(&Node::Node(*source), used_edges);
let to_sink = distance_to_sink(&Node::Node(*sink), used_edges);
for (s, edges) in used_edges {
for t in edges.keys() {
let path_length = from_source[s] + 1 + to_sink[t];
result
.entry(-path_length)
.or_default()
.insert((s.clone(), t.clone()));
}
}
result
}
fn distance_from_source(
source: &Node,
used_edges: &HashMap<Node, HashMap<Node, U256>>,
) -> HashMap<Node, i64> {
let mut distances = HashMap::<Node, i64>::new();
let mut to_process = VecDeque::<Node>::new();
distances.insert(source.clone(), 0);
to_process.push_back(source.clone());
while let Some(n) = to_process.pop_front() {
for (t, capacity) in used_edges.get(&n).unwrap_or(&HashMap::new()) {
if *capacity > U256::from(0) && !distances.contains_key(t) {
distances.insert(t.clone(), distances[&n] + 1);
to_process.push_back(t.clone());
}
}
}
distances
}
fn distance_to_sink(
sink: &Node,
used_edges: &HashMap<Node, HashMap<Node, U256>>,
) -> HashMap<Node, i64> {
distance_from_source(sink, &reverse_edges(used_edges))
}
fn reverse_edges(
used_edges: &HashMap<Node, HashMap<Node, U256>>,
) -> HashMap<Node, HashMap<Node, U256>> {
let mut reversed: HashMap<Node, HashMap<Node, U256>> = HashMap::new();
for (n, edges) in used_edges {
for (t, capacity) in edges {
reversed
.entry(t.clone())
.or_default()
.insert(n.clone(), *capacity);
}
}
reversed
}
fn smallest_edge_in_set(
all_edges: &HashMap<Node, HashMap<Node, U256>>,
edge_set: &HashSet<(Node, Node)>,
) -> Option<(Node, Node)> {
if let Some((a, b, _)) = edge_set
.iter()
.map(|(a, b)| {
let capacity = if let Some(out) = all_edges.get(a) {
if let Some(capacity) = out.get(b) {
assert!(*capacity != U256::from(0));
Some(capacity)
} else {
None
}
} else {
None
};
(a, b, capacity)
})
.filter(|(_, _, capacity)| capacity.is_some())
.min_by_key(|(_, _, capacity)| capacity.unwrap())
{
Some((a.clone(), b.clone()))
} else {
None
}
}
fn smallest_edge_from(
used_edges: &HashMap<Node, HashMap<Node, U256>>,
n: &Node,
) -> Option<(Node, U256)> {
used_edges
.get(n)
.and_then(|out| {
out.iter()
.min_by_key(|(_, c)| {
assert!(**c != U256::from(0));
*c
})
.map(|(t, c)| (t.clone(), *c))
})
}
fn smallest_edge_to(
used_edges: &HashMap<Node, HashMap<Node, U256>>,
n: &Node,
) -> Option<(Node, U256)> {
used_edges
.iter()
.filter(|(_, out)| out.contains_key(n))
.map(|(t, out)| (t, out[n]))
.min_by_key(|(_, c)| {
assert!(*c != U256::from(0));
*c
})
.map(|(t, c)| (t.clone(), c))
}
/// Removes the edge (potentially partially), removing a given amount of flow.
/// Returns the remaining flow to prune if the edge was too small.
fn prune_edge(
used_edges: &mut HashMap<Node, HashMap<Node, U256>>,
edge: (&Node, &Node),
flow_to_prune: U256,
) -> U256 {
let edge_size = min(flow_to_prune, used_edges[edge.0][edge.1]);
reduce_capacity(used_edges, edge, &edge_size);
prune_path(used_edges, edge.1, edge_size, PruneDirection::Forwards);
prune_path(used_edges, edge.0, edge_size, PruneDirection::Backwards);
flow_to_prune - edge_size
}
fn reduce_capacity(
used_edges: &mut HashMap<Node, HashMap<Node, U256>>,
(a, b): (&Node, &Node),
reduction: &U256,
) {
let out_edges = used_edges.get_mut(a).unwrap();
*out_edges.get_mut(b).unwrap() -= *reduction;
if out_edges[b] == U256::from(0) {
out_edges.remove_entry(b);
}
}
#[derive(Clone, Copy)]
enum PruneDirection {
Forwards,
Backwards,
}
fn prune_path(
used_edges: &mut HashMap<Node, HashMap<Node, U256>>,
n: &Node,
mut flow_to_prune: U256,
direction: PruneDirection,
) {
while let Some((next, mut capacity)) = match direction {
PruneDirection::Forwards => smallest_edge_from(used_edges, n),
PruneDirection::Backwards => smallest_edge_to(used_edges, n),
} {
capacity = min(flow_to_prune, capacity);
match direction {
PruneDirection::Forwards => reduce_capacity(used_edges, (n, &next), &capacity),
PruneDirection::Backwards => reduce_capacity(used_edges, (&next, n), &capacity),
};
prune_path(used_edges, &next, capacity, direction);
flow_to_prune -= capacity;
if flow_to_prune == U256::from(0) {
return;
}
}
}
fn extract_transfers(
source: &Address,
sink: &Address,
@ -227,7 +459,7 @@ mod test {
token: t,
capacity: U256::from(10),
}]);
let flow = compute_flow(&a, &b, &edges, None);
let flow = compute_flow(&a, &b, &edges, U256::MAX, None);
assert_eq!(flow, (U256::from(10), edges[&a].clone()));
}
@ -248,7 +480,7 @@ mod test {
capacity: U256::from(8),
},
]);
let flow = compute_flow(&a, &c, &edges, None);
let flow = compute_flow(&a, &c, &edges, U256::MAX, None);
assert_eq!(
flow,
(
@ -300,7 +532,7 @@ mod test {
capacity: U256::from(8),
},
]);
let mut flow = compute_flow(&a, &d, &edges, None);
let mut flow = compute_flow(&a, &d, &edges, U256::MAX, None);
flow.1.sort();
assert_eq!(
flow,
@ -334,5 +566,27 @@ mod test {
]
)
);
let mut pruned_flow = compute_flow(&a, &d, &edges, U256::from(6), None);
pruned_flow.1.sort();
assert_eq!(
pruned_flow,
(
U256::from(6),
vec![
Edge {
from: a,
to: b,
token: t1,
capacity: U256::from(6)
},
Edge {
from: b,
to: d,
token: t2,
capacity: U256::from(6)
},
]
)
);
}
}

View file

@ -1,6 +1,6 @@
use crate::flow;
use crate::io::read_edges_binary;
use crate::types::{Address, Edge};
use crate::types::{Address, Edge, U256};
use json::JsonValue;
use std::collections::HashMap;
use std::error::Error;
@ -61,7 +61,21 @@ fn handle_connection(
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
compute_transfer(request, e.as_ref(), socket)?;
}
"cancel" => {}
"update_edges" => {}
// TODO error handling
_ => {}
};
Ok(())
}
fn compute_transfer(
request: JsonRpcRequest,
edges: &HashMap<Address, Vec<Edge>>,
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
socket.write_all(chunked_header().as_bytes())?;
let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
vec![Some(1), Some(2), None]
@ -72,8 +86,12 @@ fn handle_connection(
let (flow, transfers) = flow::compute_flow(
&Address::from(request.params["from"].to_string().as_str()),
&Address::from(request.params["to"].to_string().as_str()),
//&U256::from(request.params["value"].to_string().as_str()),
e.as_ref(),
edges,
if request.params.has_key("value") {
U256::from(request.params["value"].to_string().as_str())
} else {
U256::MAX
},
max_distance,
);
println!(
@ -101,12 +119,6 @@ fn handle_connection(
)?;
}
socket.write_all(chunked_close().as_bytes())?;
}
"cancel" => {}
"update_edges" => {}
// TODO error handling
_ => {}
};
Ok(())
}

View file

@ -6,9 +6,10 @@ use std::ops::{Add, AddAssign, Neg, Sub, SubAssign};
pub struct U256([u128; 2]);
impl U256 {
pub fn new(high: u128, low: u128) -> U256 {
pub const fn new(high: u128, low: u128) -> U256 {
U256([high, low])
}
pub const MAX: U256 = U256::new(u128::MAX, u128::MAX);
}
impl From<u128> for U256 {

View file

@ -1,16 +1,17 @@
use pathfinder2::flow::compute_flow;
use pathfinder2::io::read_edges_binary;
use pathfinder2::types::Address;
// use pathfinder2::flow::compute_flow;
// use pathfinder2::io::read_edges_binary;
// use pathfinder2::types::Address;
#[test]
fn test_flow() {
let edges = read_edges_binary(&"edges.dat".to_string()).unwrap();
let transfers = compute_flow(
&Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"),
&Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
//&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
&edges,
None,
);
println!("{:?}", transfers);
}
// #[test]
// fn test_flow() {
// let edges = read_edges_binary(&"edges.dat".to_string()).unwrap();
// let transfers = compute_flow(
// &Address::from("0x8DC7e86fF693e9032A0F41711b5581a04b26Be2E"),
// &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
// //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
// &edges,
// U256::MAX,
// None,
// );
// println!("{:?}", transfers);
// }