diff --git a/src/flow/flow.rs b/src/flow/flow.rs
index bd9003f..f8c3596 100644
--- a/src/flow/flow.rs
+++ b/src/flow/flow.rs
@@ -10,13 +10,14 @@ pub fn compute_flow(
source: &Address,
sink: &Address,
edges: &HashMap
>,
+ max_distance: Option,
) -> (U256, Vec) {
let mut adjacencies = Adjacencies::new(edges);
let mut used_edges: HashMap> = HashMap::new();
let mut flow = U256::default();
loop {
- let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies);
+ let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies, max_distance);
if new_flow == U256::default() {
break;
}
@@ -63,14 +64,20 @@ fn augmenting_path(
source: &Address,
sink: &Address,
adjacencies: &mut Adjacencies,
+ max_distance: Option,
) -> (U256, Vec) {
let mut parent = HashMap::new();
if *source == *sink {
return (U256::default(), vec![]);
}
- let mut queue = VecDeque::<(Node, U256)>::new();
- queue.push_back((Node::Node(*source), U256::default() - U256::from(1)));
- while let Some((node, flow)) = queue.pop_front() {
+ let mut queue = VecDeque::<(Node, (u64, U256))>::new();
+ queue.push_back((Node::Node(*source), (0, U256::default() - U256::from(1))));
+ while let Some((node, (depth, flow))) = queue.pop_front() {
+ if let Some(max) = max_distance {
+ if depth >= max {
+ continue;
+ }
+ }
for (target, capacity) in adjacencies.outgoing_edges_sorted_by_capacity(&node) {
if !parent.contains_key(&target) && capacity > U256::default() {
parent.insert(target.clone(), node.clone());
@@ -81,7 +88,7 @@ fn augmenting_path(
trace(parent, &Node::Node(*source), &Node::Node(*sink)),
);
}
- queue.push_back((target, new_flow));
+ queue.push_back((target, (depth + 1, new_flow)));
}
}
}
@@ -220,7 +227,7 @@ mod test {
token: t,
capacity: U256::from(10),
}]);
- let flow = compute_flow(&a, &b, &edges);
+ let flow = compute_flow(&a, &b, &edges, None);
assert_eq!(flow, (U256::from(10), edges[&a].clone()));
}
@@ -241,7 +248,7 @@ mod test {
capacity: U256::from(8),
},
]);
- let flow = compute_flow(&a, &c, &edges);
+ let flow = compute_flow(&a, &c, &edges, None);
assert_eq!(
flow,
(
@@ -293,7 +300,7 @@ mod test {
capacity: U256::from(8),
},
]);
- let mut flow = compute_flow(&a, &d, &edges);
+ let mut flow = compute_flow(&a, &d, &edges, None);
flow.1.sort();
assert_eq!(
flow,
diff --git a/src/server.rs b/src/server.rs
index 6a11b27..3993e95 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -56,35 +56,51 @@ fn handle_connection(
let updated_edges = read_edges_binary(&request.params["file"].to_string())?;
let len = updated_edges.len();
*edges.write().unwrap() = Arc::new(updated_edges);
- socket.write_all(jsonrpc_result(request.id, len).as_bytes())?;
+ socket.write_all(jsonrpc_response(request.id, len).as_bytes())?;
}
"compute_transfer" => {
println!("Computing flow");
let e = edges.read().unwrap().clone();
- let (flow, transfers) = flow::compute_flow(
- &Address::from(request.params["from"].to_string().as_str()),
- &Address::from(request.params["to"].to_string().as_str()),
- //&U256::from(request.params["value"].to_string().as_str()),
- e.as_ref(),
- );
- println!("Computed flow");
- // TODO error handling
- socket.write_all(
- jsonrpc_result(
- request.id,
- json::object! {
- flow: flow.to_string(),
- final: true,
- transfers: transfers.into_iter().map(|e| json::object! {
- from: e.from.to_string(),
- to: e.to.to_string(),
- token: e.token.to_string(),
- value: e.capacity.to_string()
- }).collect::>(),
- },
- )
- .as_bytes(),
- )?;
+
+ socket.write_all(chunked_header().as_bytes())?;
+ let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
+ vec![Some(1), Some(2), None]
+ } else {
+ vec![None]
+ };
+ for max_distance in max_distances {
+ let (flow, transfers) = flow::compute_flow(
+ &Address::from(request.params["from"].to_string().as_str()),
+ &Address::from(request.params["to"].to_string().as_str()),
+ //&U256::from(request.params["value"].to_string().as_str()),
+ e.as_ref(),
+ max_distance,
+ );
+ println!(
+ "Computed flow with max distance {:?}: {}",
+ max_distance, flow
+ );
+ // TODO error handling
+ socket.write_all(
+ chunked_response(
+ &(jsonrpc_result(
+ request.id.clone(),
+ json::object! {
+ flow: flow.to_string(),
+ final: max_distance.is_none(),
+ transfers: transfers.into_iter().map(|e| json::object! {
+ from: e.from.to_string(),
+ to: e.to.to_string(),
+ token: e.token.to_string(),
+ value: e.capacity.to_string()
+ }).collect::>(),
+ },
+ ) + "\r\n"),
+ )
+ .as_bytes(),
+ )?;
+ }
+ socket.write_all(chunked_close().as_bytes())?;
}
"cancel" => {}
"update_edges" => {}
@@ -130,16 +146,36 @@ fn read_payload(socket: &mut TcpStream) -> Result, Box> {
Ok(payload)
}
-fn jsonrpc_result(id: JsonValue, result: impl Into) -> String {
- let payload = json::object! {
- jsonrpc: "2.0",
- id: id,
- result: result.into(),
- }
- .dump();
+fn jsonrpc_response(id: JsonValue, result: impl Into) -> String {
+ let payload = jsonrpc_result(id, result);
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}
+
+fn jsonrpc_result(id: JsonValue, result: impl Into) -> String {
+ json::object! {
+ jsonrpc: "2.0",
+ id: id,
+ result: result.into(),
+ }
+ .dump()
+}
+
+fn chunked_header() -> String {
+ "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".to_string()
+}
+
+fn chunked_response(data: &str) -> String {
+ if data.is_empty() {
+ String::new()
+ } else {
+ format!("{:x}\r\n{}\r\n", data.len(), data)
+ }
+}
+
+fn chunked_close() -> String {
+ "0\r\n\r\n".to_string()
+}
diff --git a/tests/integration.rs b/tests/integration.rs
index 64c8718..80dbd2b 100644
--- a/tests/integration.rs
+++ b/tests/integration.rs
@@ -10,6 +10,7 @@ fn test_flow() {
&Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
//&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
&edges,
+ None,
);
println!("{:?}", transfers);
}