Iterative flow computation.

This commit is contained in:
chriseth 2022-09-17 15:46:08 +02:00
parent d728be698a
commit cff2d35231
3 changed files with 84 additions and 40 deletions

View file

@ -10,13 +10,14 @@ pub fn compute_flow(
source: &Address, source: &Address,
sink: &Address, sink: &Address,
edges: &HashMap<Address, Vec<Edge>>, edges: &HashMap<Address, Vec<Edge>>,
max_distance: Option<u64>,
) -> (U256, Vec<Edge>) { ) -> (U256, Vec<Edge>) {
let mut adjacencies = Adjacencies::new(edges); let mut adjacencies = Adjacencies::new(edges);
let mut used_edges: HashMap<Node, HashMap<Node, U256>> = HashMap::new(); let mut used_edges: HashMap<Node, HashMap<Node, U256>> = HashMap::new();
let mut flow = U256::default(); let mut flow = U256::default();
loop { loop {
let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies); let (new_flow, parents) = augmenting_path(source, sink, &mut adjacencies, max_distance);
if new_flow == U256::default() { if new_flow == U256::default() {
break; break;
} }
@ -63,14 +64,20 @@ fn augmenting_path(
source: &Address, source: &Address,
sink: &Address, sink: &Address,
adjacencies: &mut Adjacencies, adjacencies: &mut Adjacencies,
max_distance: Option<u64>,
) -> (U256, Vec<Node>) { ) -> (U256, Vec<Node>) {
let mut parent = HashMap::new(); let mut parent = HashMap::new();
if *source == *sink { if *source == *sink {
return (U256::default(), vec![]); return (U256::default(), vec![]);
} }
let mut queue = VecDeque::<(Node, U256)>::new(); let mut queue = VecDeque::<(Node, (u64, U256))>::new();
queue.push_back((Node::Node(*source), U256::default() - U256::from(1))); queue.push_back((Node::Node(*source), (0, U256::default() - U256::from(1))));
while let Some((node, flow)) = queue.pop_front() { while let Some((node, (depth, flow))) = queue.pop_front() {
if let Some(max) = max_distance {
if depth >= max {
continue;
}
}
for (target, capacity) in adjacencies.outgoing_edges_sorted_by_capacity(&node) { for (target, capacity) in adjacencies.outgoing_edges_sorted_by_capacity(&node) {
if !parent.contains_key(&target) && capacity > U256::default() { if !parent.contains_key(&target) && capacity > U256::default() {
parent.insert(target.clone(), node.clone()); parent.insert(target.clone(), node.clone());
@ -81,7 +88,7 @@ fn augmenting_path(
trace(parent, &Node::Node(*source), &Node::Node(*sink)), trace(parent, &Node::Node(*source), &Node::Node(*sink)),
); );
} }
queue.push_back((target, new_flow)); queue.push_back((target, (depth + 1, new_flow)));
} }
} }
} }
@ -220,7 +227,7 @@ mod test {
token: t, token: t,
capacity: U256::from(10), capacity: U256::from(10),
}]); }]);
let flow = compute_flow(&a, &b, &edges); let flow = compute_flow(&a, &b, &edges, None);
assert_eq!(flow, (U256::from(10), edges[&a].clone())); assert_eq!(flow, (U256::from(10), edges[&a].clone()));
} }
@ -241,7 +248,7 @@ mod test {
capacity: U256::from(8), capacity: U256::from(8),
}, },
]); ]);
let flow = compute_flow(&a, &c, &edges); let flow = compute_flow(&a, &c, &edges, None);
assert_eq!( assert_eq!(
flow, flow,
( (
@ -293,7 +300,7 @@ mod test {
capacity: U256::from(8), capacity: U256::from(8),
}, },
]); ]);
let mut flow = compute_flow(&a, &d, &edges); let mut flow = compute_flow(&a, &d, &edges, None);
flow.1.sort(); flow.1.sort();
assert_eq!( assert_eq!(
flow, flow,

View file

@ -56,35 +56,51 @@ fn handle_connection(
let updated_edges = read_edges_binary(&request.params["file"].to_string())?; let updated_edges = read_edges_binary(&request.params["file"].to_string())?;
let len = updated_edges.len(); let len = updated_edges.len();
*edges.write().unwrap() = Arc::new(updated_edges); *edges.write().unwrap() = Arc::new(updated_edges);
socket.write_all(jsonrpc_result(request.id, len).as_bytes())?; socket.write_all(jsonrpc_response(request.id, len).as_bytes())?;
} }
"compute_transfer" => { "compute_transfer" => {
println!("Computing flow"); println!("Computing flow");
let e = edges.read().unwrap().clone(); let e = edges.read().unwrap().clone();
let (flow, transfers) = flow::compute_flow(
&Address::from(request.params["from"].to_string().as_str()), socket.write_all(chunked_header().as_bytes())?;
&Address::from(request.params["to"].to_string().as_str()), let max_distances = if request.params["iterative"].as_bool().unwrap_or_default() {
//&U256::from(request.params["value"].to_string().as_str()), vec![Some(1), Some(2), None]
e.as_ref(), } else {
); vec![None]
println!("Computed flow"); };
// TODO error handling for max_distance in max_distances {
socket.write_all( let (flow, transfers) = flow::compute_flow(
jsonrpc_result( &Address::from(request.params["from"].to_string().as_str()),
request.id, &Address::from(request.params["to"].to_string().as_str()),
json::object! { //&U256::from(request.params["value"].to_string().as_str()),
flow: flow.to_string(), e.as_ref(),
final: true, max_distance,
transfers: transfers.into_iter().map(|e| json::object! { );
from: e.from.to_string(), println!(
to: e.to.to_string(), "Computed flow with max distance {:?}: {}",
token: e.token.to_string(), max_distance, flow
value: e.capacity.to_string() );
}).collect::<Vec<_>>(), // TODO error handling
}, socket.write_all(
) chunked_response(
.as_bytes(), &(jsonrpc_result(
)?; request.id.clone(),
json::object! {
flow: flow.to_string(),
final: max_distance.is_none(),
transfers: transfers.into_iter().map(|e| json::object! {
from: e.from.to_string(),
to: e.to.to_string(),
token: e.token.to_string(),
value: e.capacity.to_string()
}).collect::<Vec<_>>(),
},
) + "\r\n"),
)
.as_bytes(),
)?;
}
socket.write_all(chunked_close().as_bytes())?;
} }
"cancel" => {} "cancel" => {}
"update_edges" => {} "update_edges" => {}
@ -130,16 +146,36 @@ fn read_payload(socket: &mut TcpStream) -> Result<Vec<u8>, Box<dyn Error>> {
Ok(payload) Ok(payload)
} }
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String { fn jsonrpc_response(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
let payload = json::object! { let payload = jsonrpc_result(id, result);
jsonrpc: "2.0",
id: id,
result: result.into(),
}
.dump();
format!( format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(), payload.len(),
payload payload
) )
} }
fn jsonrpc_result(id: JsonValue, result: impl Into<json::JsonValue>) -> String {
json::object! {
jsonrpc: "2.0",
id: id,
result: result.into(),
}
.dump()
}
fn chunked_header() -> String {
"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".to_string()
}
fn chunked_response(data: &str) -> String {
if data.is_empty() {
String::new()
} else {
format!("{:x}\r\n{}\r\n", data.len(), data)
}
}
fn chunked_close() -> String {
"0\r\n\r\n".to_string()
}

View file

@ -10,6 +10,7 @@ fn test_flow() {
&Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"), &Address::from("0x42cEDde51198D1773590311E2A340DC06B24cB37"),
//&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"), //&Address::from("0x9f5ff18027adbb65a53086cdc09d12ce463dae0b"),
&edges, &edges,
None,
); );
println!("{:?}", transfers); println!("{:?}", transfers);
} }