From cff2d35231f87a7d8198ccd4880bc8ca6548ee9f Mon Sep 17 00:00:00 2001 From: chriseth Date: Sat, 17 Sep 2022 15:46:08 +0200 Subject: [PATCH] Iterative flow computation. --- src/flow/flow.rs | 23 ++++++---- src/server.rs | 100 +++++++++++++++++++++++++++++-------------- tests/integration.rs | 1 + 3 files changed, 84 insertions(+), 40 deletions(-) diff --git a/src/flow/flow.rs b/src/flow/flow.rs index bd9003f..f8c3596 100644 --- a/src/flow/flow.rs +++ b/src/flow/flow.rs @@ -10,13 +10,14 @@ pub fn compute_flow( source: &Address, sink: &Address, edges: &HashMap>, + max_distance: Option, ) -> (U256, Vec) { let mut adjacencies = Adjacencies::new(edges); let mut used_edges: HashMap> = HashMap::new(); let mut flow = U256::default(); loop { - let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies); + let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies, max_distance); if new_flow == U256::default() { break; } @@ -63,14 +64,20 @@ fn augmenting_path( source: &Address, sink: &Address, adjacencies: &mut Adjacencies, + max_distance: Option, ) -> (U256, Vec) { let mut parent = HashMap::new(); if *source == *sink { return (U256::default(), vec![]); } - let mut queue = VecDeque::<(Node, U256)>::new(); - queue.push_back((Node::Node(*source), U256::default() - U256::from(1))); - while let Some((node, flow)) = queue.pop_front() { + let mut queue = VecDeque::<(Node, (u64, U256))>::new(); + queue.push_back((Node::Node(*source), (0, U256::default() - U256::from(1)))); + while let Some((node, (depth, flow))) = queue.pop_front() { + if let Some(max) = max_distance { + if depth >= max { + continue; + } + } for (target, capacity) in adjacencies.outgoing_edges_sorted_by_capacity(&node) { if !parent.contains_key(&target) && capacity > U256::default() { parent.insert(target.clone(), node.clone()); @@ -81,7 +88,7 @@ fn augmenting_path( trace(parent, &Node::Node(*source), &Node::Node(*sink)), ); } - queue.push_back((target, new_flow)); + queue.push_back((target, (depth + 1, new_flow))); } } } @@ -220,7 +227,7 @@ mod test { token: t, capacity: U256::from(10), }]); - let flow = compute_flow(&a, &b, &edges); + let flow = compute_flow(&a, &b, &edges, None); assert_eq!(flow, (U256::from(10), edges[&a].clone())); } @@ -241,7 +248,7 @@ mod test { capacity: U256::from(8), }, ]); - let flow = compute_flow(&a, &c, &edges); + let flow = compute_flow(&a, &c, &edges, None); assert_eq!( flow, ( @@ -293,7 +300,7 @@ mod test { capacity: U256::from(8), }, ]); - let mut flow = compute_flow(&a, &d, &edges); + let mut flow = compute_flow(&a, &d, &edges, None); flow.1.sort(); assert_eq!( flow, diff --git a/src/server.rs b/src/server.rs index 6a11b27..3993e95 100644 --- a/src/server.rs +++ b/src/server.rs @@ -56,35 +56,51 @@ fn handle_connection( let updated_edges = read_edges_binary(&request.params["file"].to_string())?; let len = updated_edges.len(); *edges.write().unwrap() = Arc::new(updated_edges); - socket.write_all(jsonrpc_result(request.id, len).as_bytes())?; + socket.write_all(jsonrpc_response(request.id, len).as_bytes())?; } "compute_transfer" => { println!("Computing flow"); let e = edges.read().unwrap().clone(); - let (flow, transfers) = flow::compute_flow( - &Address::from(request.params["from"].to_string().as_str()), - &Address::from(request.params["to"].to_string().as_str()), - //&U256::from(request.params["value"].to_string().as_str()), - e.as_ref(), - ); - println!("Computed flow"); - // TODO error handling - socket.write_all( - jsonrpc_result( - request.id, - json::object! { - flow: flow.to_string(), - final: true, - transfers: transfers.into_iter().map(|e| json::object! { - from: e.from.to_string(), - to: e.to.to_string(), - token: e.token.to_string(), - value: e.capacity.to_string() - }).collect::>(), - }, - ) - .as_bytes(), - )?; + + socket.write_all(chunked_header().as_bytes())?; + let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() { + vec![Some(1), Some(2), None] + } else { + vec![None] + }; + for max_distance in max_distances { + let (flow, transfers) = flow::compute_flow( + &Address::from(request.params["from"].to_string().as_str()), + &Address::from(request.params["to"].to_string().as_str()), + //&U256::from(request.params["value"].to_string().as_str()), + e.as_ref(), + max_distance, + ); + println!( + "Computed flow with max distance {:?}: {}", + max_distance, flow + ); + // TODO error handling + socket.write_all( + chunked_response( + &(jsonrpc_result( + request.id.clone(), + json::object! { + flow: flow.to_string(), + final: max_distance.is_none(), + transfers: transfers.into_iter().map(|e| json::object! { + from: e.from.to_string(), + to: e.to.to_string(), + token: e.token.to_string(), + value: e.capacity.to_string() + }).collect::>(), + }, + ) + "\r\n"), + ) + .as_bytes(), + )?; + } + socket.write_all(chunked_close().as_bytes())?; } "cancel" => {} "update_edges" => {} @@ -130,16 +146,36 @@ fn read_payload(socket: &mut TcpStream) -> Result, Box> { Ok(payload) } -fn jsonrpc_result(id: JsonValue, result: impl Into) -> String { - let payload = json::object! { - jsonrpc: "2.0", - id: id, - result: result.into(), - } - .dump(); +fn jsonrpc_response(id: JsonValue, result: impl Into) -> String { + let payload = jsonrpc_result(id, result); format!( "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", payload.len(), payload ) } + +fn jsonrpc_result(id: JsonValue, result: impl Into) -> String { + json::object! { + jsonrpc: "2.0", + id: id, + result: result.into(), + } + .dump() +} + +fn chunked_header() -> String { + "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".to_string() +} + +fn chunked_response(data: &str) -> String { + if data.is_empty() { + String::new() + } else { + format!("{:x}\r\n{}\r\n", data.len(), data) + } +} + +fn chunked_close() -> String { + "0\r\n\r\n".to_string() +} diff --git a/tests/integration.rs b/tests/integration.rs index 64c8718..80dbd2b 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -10,6 +10,7 @@ fn test_flow() { &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"), //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"), &edges, + None, ); println!("{:?}", transfers); }