Add edgedb.

This commit is contained in:
chriseth 2022-12-06 22:16:36 +01:00
parent 782b26d9c3
commit 93077dce52
7 changed files with 125 additions and 62 deletions

View file

@ -58,7 +58,7 @@ fn main() {
io::read_edges_binary(edges_file)
})
.unwrap_or_else(|_| panic!("Error loading edges from file \"{edges_file}\"."));
println!("Read {} edges", edges.len());
println!("Read {} edges", edges.edge_count());
let (flow, transfers) = graph::compute_flow(
&Address::from(from_str.as_str()),
&Address::from(to_str.as_str()),

View file

@ -1,10 +1,11 @@
use crate::graph::Node;
use crate::types::edge::EdgeDB;
use crate::types::{Address, Edge, U256};
use std::cmp::Reverse;
use std::collections::HashMap;
pub struct Adjacencies<'a> {
edges: &'a HashMap<Address, Vec<Edge>>,
edges: &'a EdgeDB,
lazy_adjacencies: HashMap<Node, HashMap<Node, U256>>,
capacity_adjustments: HashMap<Node, HashMap<Node, U256>>,
}
@ -21,7 +22,7 @@ fn source_address_of(node: &Node) -> &Address {
}
impl<'a> Adjacencies<'a> {
pub fn new(edges: &'a HashMap<Address, Vec<Edge>>) -> Self {
pub fn new(edges: &'a EdgeDB) -> Self {
Adjacencies {
edges,
lazy_adjacencies: HashMap::new(),
@ -69,7 +70,7 @@ impl<'a> Adjacencies<'a> {
.or_insert_with(|| {
let mut result: HashMap<Node, U256> = HashMap::new();
// Plain edges are (from, to, token) labeled with capacity
for edge in self.edges.get(source_address_of(from)).unwrap_or(&vec![]) {
for edge in self.edges.outgoing(source_address_of(from)) {
match from {
Node::Node(_) => {
// One edge from "from" to "from x token" with a capacity

View file

@ -1,5 +1,6 @@
use crate::graph::adjacencies::Adjacencies;
use crate::graph::{node_as_address, node_as_token_edge, Node};
use crate::types::edge::EdgeDB;
use crate::types::{Address, Edge, U256};
use std::cmp::min;
use std::collections::VecDeque;
@ -9,7 +10,7 @@ use std::fmt::Write;
pub fn compute_flow(
source: &Address,
sink: &Address,
edges: &HashMap<Address, Vec<Edge>>,
edges: &EdgeDB,
requested_flow: U256,
max_distance: Option<u64>,
) -> (U256, Vec<Edge>) {
@ -478,12 +479,8 @@ mod test {
Address::from("0x66c16ce62d26fd51582a646e2e30a3267b1e6d7e"),
)
}
fn build_edges(input: Vec<Edge>) -> HashMap<Address, Vec<Edge>> {
let mut output: HashMap<Address, Vec<Edge>> = HashMap::new();
for e in input {
output.entry(e.from).or_default().push(e);
}
output
fn build_edges(input: Vec<Edge>) -> EdgeDB {
EdgeDB::new(input)
}
#[test]
@ -496,7 +493,18 @@ mod test {
capacity: U256::from(10),
}]);
let flow = compute_flow(&a, &b, &edges, U256::MAX, None);
assert_eq!(flow, (U256::from(10), edges[&a].clone()));
assert_eq!(
flow,
(
U256::from(10),
vec![Edge {
from: a,
to: b,
token: t,
capacity: U256::from(10)
}]
)
);
}
#[test]

View file

@ -3,16 +3,17 @@ use std::io::Read;
use std::io::{self, BufRead};
use std::{collections::HashMap, io::BufReader};
use crate::types::edge::EdgeDB;
use crate::types::{Address, Edge, U256};
pub fn read_edges_binary(path: &String) -> Result<HashMap<Address, Vec<Edge>>, io::Error> {
pub fn read_edges_binary(path: &String) -> Result<EdgeDB, io::Error> {
let mut f = File::open(path)?;
let address_index = read_address_index(&mut f)?;
read_edges(&mut f, &address_index)
}
pub fn read_edges_csv(path: &String) -> Result<HashMap<Address, Vec<Edge>>, io::Error> {
let mut result = HashMap::<Address, Vec<Edge>>::new();
pub fn read_edges_csv(path: &String) -> Result<EdgeDB, io::Error> {
let mut edges = Vec::new();
let f = BufReader::new(File::open(path)?);
for line in f.lines() {
let line = line?;
@ -23,7 +24,7 @@ pub fn read_edges_csv(path: &String) -> Result<HashMap<Address, Vec<Edge>>, io::
let to = Address::from(unescape(to));
let token = Address::from(unescape(token));
let capacity = U256::from(unescape(capacity));
result.entry(from).or_default().push(Edge {
edges.push(Edge {
from,
to,
token,
@ -38,7 +39,7 @@ pub fn read_edges_csv(path: &String) -> Result<HashMap<Address, Vec<Edge>>, io::
}
}
}
Ok(result)
Ok(EdgeDB::new(edges))
}
fn read_address_index(file: &mut File) -> Result<HashMap<u32, Address>, io::Error> {
@ -81,25 +82,22 @@ fn read_u256(file: &mut File) -> Result<U256, io::Error> {
Ok(U256::new(high, low))
}
fn read_edges(
file: &mut File,
address_index: &HashMap<u32, Address>,
) -> Result<HashMap<Address, Vec<Edge>>, io::Error> {
fn read_edges(file: &mut File, address_index: &HashMap<u32, Address>) -> Result<EdgeDB, io::Error> {
let edge_count = read_u32(file)?;
let mut edges: HashMap<Address, Vec<Edge>> = HashMap::new();
let mut edges = Vec::new();
for _i in 0..edge_count {
let from = read_address(file, address_index)?;
let to = read_address(file, address_index)?;
let token = read_address(file, address_index)?;
let capacity = read_u256(file)?;
edges.entry(from).or_insert(vec![]).push(Edge {
edges.push(Edge {
from,
to,
token,
capacity,
});
}
Ok(edges)
Ok(EdgeDB::new(edges))
}
fn unescape(input: &str) -> &str {

View file

@ -1,8 +1,8 @@
use crate::graph;
use crate::io::{read_edges_binary, read_edges_csv};
use crate::types::edge::EdgeDB;
use crate::types::{Address, Edge, U256};
use json::JsonValue;
use std::collections::HashMap;
use std::error::Error;
use std::io::Read;
use std::io::{BufRead, BufReader, Write};
@ -18,10 +18,8 @@ struct JsonRpcRequest {
params: JsonValue,
}
type EdgeMap = HashMap<Address, Vec<Edge>>;
pub fn start_server(port: u16, queue_size: usize, threads: u64) {
let edges: Arc<RwLock<Arc<EdgeMap>>> = Arc::new(RwLock::new(Arc::new(HashMap::new())));
let edges: Arc<RwLock<Arc<EdgeDB>>> = Arc::new(RwLock::new(Arc::new(EdgeDB::default())));
let (sender, receiver) = mpsc::sync_channel(queue_size);
let protected_receiver = Arc::new(Mutex::new(receiver));
@ -54,7 +52,7 @@ pub fn start_server(port: u16, queue_size: usize, threads: u64) {
}
fn handle_connection(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
edges: &RwLock<Arc<EdgeDB>>,
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
let request = read_request(&mut socket)?;
@ -104,29 +102,23 @@ fn handle_connection(
Ok(())
}
fn load_edges_binary(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
file: &String,
) -> Result<usize, Box<dyn Error>> {
fn load_edges_binary(edges: &RwLock<Arc<EdgeDB>>, file: &String) -> Result<usize, Box<dyn Error>> {
let updated_edges = read_edges_binary(file)?;
let len = updated_edges.len();
let len = updated_edges.edge_count();
*edges.write().unwrap() = Arc::new(updated_edges);
Ok(len)
}
fn load_edges_csv(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
file: &String,
) -> Result<usize, Box<dyn Error>> {
fn load_edges_csv(edges: &RwLock<Arc<EdgeDB>>, file: &String) -> Result<usize, Box<dyn Error>> {
let updated_edges = read_edges_csv(file)?;
let len = updated_edges.len();
let len = updated_edges.edge_count();
*edges.write().unwrap() = Arc::new(updated_edges);
Ok(len)
}
fn compute_transfer(
request: JsonRpcRequest,
edges: &HashMap<Address, Vec<Edge>>,
edges: &EdgeDB,
mut socket: TcpStream,
) -> Result<(), Box<dyn Error>> {
socket.write_all(chunked_header().as_bytes())?;
@ -175,7 +167,7 @@ fn compute_transfer(
}
fn update_edges(
edges: &RwLock<Arc<HashMap<Address, Vec<Edge>>>>,
edges: &RwLock<Arc<EdgeDB>>,
updates: Vec<JsonValue>,
) -> Result<usize, Box<dyn Error>> {
let updates = updates
@ -188,27 +180,14 @@ fn update_edges(
})
.collect::<Vec<_>>();
if updates.is_empty() {
return Ok(edges.read().unwrap().len());
return Ok(edges.read().unwrap().edge_count());
}
let mut updating_edges = edges.read().unwrap().as_ref().clone();
for update in updates {
let out_edges = updating_edges.entry(update.from).or_default();
if update.capacity == U256::from(0) {
out_edges.retain(|e| {
!(e.from == update.from && e.to == update.to && e.token == update.token)
});
} else {
match out_edges
.iter_mut()
.find(|e| e.from == update.from && e.to == update.to && e.token == update.token)
{
Some(e) => e.capacity = update.capacity,
_ => out_edges.push(update),
updating_edges.update(update);
}
}
}
let len = updating_edges.len();
let len = updating_edges.edge_count();
*edges.write().unwrap() = Arc::new(updating_edges);
Ok(len)
}

View file

@ -1,7 +1,9 @@
use std::collections::HashMap;
use crate::types::Address;
use crate::types::U256;
#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Ord, PartialOrd)]
#[derive(Clone, Copy, Debug, Default, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub struct Edge {
pub from: Address,
pub to: Address,
@ -11,5 +13,79 @@ pub struct Edge {
// TODO comparison, hash, etc. can ignore the capacity field.
// TODO can we derive it?
impl Eq for Edge {}
pub fn eq_up_to_capacity(e1: &Edge, e2: &Edge) -> bool {
e1.from == e2.from && e1.to == e2.to && e1.token == e2.token
}
#[derive(Debug, Default, Clone)]
pub struct EdgeDB {
edges: Vec<Edge>,
outgoing: HashMap<Address, Vec<usize>>,
incoming: HashMap<Address, Vec<usize>>,
}
impl EdgeDB {
pub fn new(edges: Vec<Edge>) -> EdgeDB {
let outgoing = outgoing_index(&edges);
let incoming = incoming_index(&edges);
EdgeDB {
edges,
outgoing,
incoming,
}
}
pub fn edge_count(&self) -> usize {
self.edges.len()
}
pub fn update(&mut self, update: Edge) {
match self.index_of(&update) {
Some(i) => self.edges[i].capacity = update.capacity,
None => {
let i = self.edges.len();
self.outgoing.entry(update.from).or_default().push(i);
self.incoming.entry(update.to).or_default().push(i);
self.edges.push(update);
}
}
}
pub fn outgoing(&self, source: &Address) -> Vec<&Edge> {
match self.outgoing.get(source) {
Some(out) => out
.iter()
.map(|i| self.edges.get(*i).unwrap())
.filter(|e| e.capacity != U256::from(0))
.collect(),
None => vec![],
}
}
fn index_of(&self, e: &Edge) -> Option<usize> {
self.outgoing.get(&e.from).and_then(|out| {
for i in out {
if eq_up_to_capacity(&self.edges[*i], e) {
return Some(*i);
}
}
None
})
}
}
fn outgoing_index(edges: &[Edge]) -> HashMap<Address, Vec<usize>> {
let mut index: HashMap<Address, Vec<usize>> = HashMap::new();
for (i, e) in edges.iter().enumerate() {
index.entry(e.from).or_default().push(i)
}
index
}
fn incoming_index(edges: &[Edge]) -> HashMap<Address, Vec<usize>> {
let mut index: HashMap<Address, Vec<usize>> = HashMap::new();
for (i, e) in edges.iter().enumerate() {
index.entry(e.to).or_default().push(i)
}
index
}

View file

@ -1,5 +1,6 @@
use pathfinder2::graph::compute_flow;
use pathfinder2::io::read_edges_binary;
use pathfinder2::types::edge::EdgeDB;
use pathfinder2::types::{Address, Edge, U256};
use std::collections::HashMap;
use std::process::Command;
@ -51,14 +52,14 @@ fn test_flow_large() {
);
}
fn read_edges() -> HashMap<Address, Vec<Edge>> {
fn read_edges() -> EdgeDB {
read_edges_binary(&"edges.dat".to_string()).unwrap()
}
fn test_flow(
source: &Address,
sink: &Address,
edges: &HashMap<Address, Vec<Edge>>,
edges: &EdgeDB,
requested_flow: U256,
max_distance: Option<u64>,
) {