fix(transfers): sequence mismatch error while listening (#212)

Co-authored-by: Daniel Gushchyan <39884512+dangush@users.noreply.github.com>
This commit is contained in:
Juan Enrique Alcaraz 2024-09-26 14:07:07 +02:00 committed by GitHub
parent 391b7bc84a
commit d536f128a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 186 additions and 116 deletions

View file

@ -28,11 +28,13 @@ use cli::Cli;
use quartz_common::{ use quartz_common::{
contract::state::{Config, LightClientOpts}, contract::state::{Config, LightClientOpts},
enclave::{ enclave::{
attestor::{self, Attestor}, attestor::{self, Attestor, DefaultAttestor},
server::{QuartzServer, WsListenerConfig}, server::{QuartzServer, WsListenerConfig},
}, },
}; };
use transfers_server::TransfersService; use transfers_server::{TransfersOp, TransfersService};
use tokio::sync::mpsc;
use crate::wslistener::WsListener;
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -79,8 +81,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sk = Arc::new(Mutex::new(None)); let sk = Arc::new(Mutex::new(None));
// Event queue
let (tx, mut rx) = mpsc::channel::<TransfersOp<DefaultAttestor>>(1);
// Consumer task: dequeue and process events
tokio::spawn(async move {
while let Some(op) = rx.recv().await {
if let Err(e) = op.client.process(op.event, op.config).await {
println!("Error processing queued event: {}", e);
}
}
});
QuartzServer::new(config.clone(), sk.clone(), attestor.clone(), ws_config) QuartzServer::new(config.clone(), sk.clone(), attestor.clone(), ws_config)
.add_service(TransfersService::new(config, sk, attestor)) .add_service(TransfersService::new(config, sk, attestor, tx))
.serve(args.rpc_addr) .serve(args.rpc_addr)
.await?; .await?;

View file

@ -3,6 +3,7 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use cosmrs::AccountId;
use cosmwasm_std::{Addr, HexBinary, Uint128}; use cosmwasm_std::{Addr, HexBinary, Uint128};
use ecies::{decrypt, encrypt}; use ecies::{decrypt, encrypt};
use k256::ecdsa::{SigningKey, VerifyingKey}; use k256::ecdsa::{SigningKey, VerifyingKey};
@ -13,13 +14,14 @@ use quartz_common::{
}, },
enclave::{ enclave::{
attestor::Attestor, attestor::Attestor,
server::{IntoServer, ProofOfPublication}, server::{IntoServer, ProofOfPublication, WsListenerConfig},
}, },
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use tonic::{Request, Response, Result as TonicResult, Status}; use tonic::{Request, Response, Result as TonicResult, Status};
use transfers_contract::msg::execute::{ClearTextTransferRequestMsg, Request as TransfersRequest}; use transfers_contract::msg::execute::{ClearTextTransferRequestMsg, Request as TransfersRequest};
use tokio::sync::mpsc::Sender;
use crate::{ use crate::{
proto::{ proto::{
@ -39,13 +41,6 @@ impl<A: Attestor> IntoServer for TransfersService<A> {
pub type RawCipherText = HexBinary; pub type RawCipherText = HexBinary;
#[derive(Clone, Debug)]
pub struct TransfersService<A> {
config: Config,
sk: Arc<Mutex<Option<SigningKey>>>,
attestor: A,
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct UpdateRequestMessage { pub struct UpdateRequestMessage {
pub state: HexBinary, pub state: HexBinary,
@ -102,15 +97,43 @@ pub struct StatusResponseMessage {
encrypted_bal: HexBinary, encrypted_bal: HexBinary,
} }
#[derive(Clone, Debug)]
pub enum TransfersOpEvent {
Query {
contract_address: AccountId,
sender: String,
ephemeral_pubkey: String,
},
Transfer {
contract_address: AccountId,
},
}
#[derive(Clone, Debug)]
pub struct TransfersOp<A: Attestor> {
pub client: TransfersService<A>,
pub event: TransfersOpEvent,
pub config: WsListenerConfig
}
#[derive(Clone, Debug)]
pub struct TransfersService<A: Attestor> {
config: Config,
sk: Arc<Mutex<Option<SigningKey>>>,
attestor: A,
pub queue_producer: Sender<TransfersOp<A>>
}
impl<A> TransfersService<A> impl<A> TransfersService<A>
where where
A: Attestor, A: Attestor,
{ {
pub fn new(config: Config, sk: Arc<Mutex<Option<SigningKey>>>, attestor: A) -> Self { pub fn new(config: Config, sk: Arc<Mutex<Option<SigningKey>>>, attestor: A, queue_producer: Sender<TransfersOp<A>>) -> Self {
Self { Self {
config, config,
sk, sk,
attestor, attestor,
queue_producer,
} }
} }
} }

View file

@ -1,7 +1,7 @@
//TODO: get rid of this //TODO: get rid of this
use std::str::FromStr; use std::{collections::BTreeMap, str::FromStr};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Error, Result};
use cosmrs::{tendermint::chain::Id as ChainId, AccountId}; use cosmrs::{tendermint::chain::Id as ChainId, AccountId};
use cosmwasm_std::{Addr, HexBinary}; use cosmwasm_std::{Addr, HexBinary};
use futures_util::StreamExt; use futures_util::StreamExt;
@ -29,99 +29,136 @@ use wasmd_client::{CliWasmdClient, QueryResult, WasmdClient};
use crate::{ use crate::{
proto::{settlement_server::Settlement, QueryRequest, UpdateRequest}, proto::{settlement_server::Settlement, QueryRequest, UpdateRequest},
transfers_server::{QueryRequestMessage, TransfersService, UpdateRequestMessage}, transfers_server::{
QueryRequestMessage, TransfersOp, TransfersOpEvent,
TransfersService, UpdateRequestMessage,
},
}; };
// TODO: Need to prevent listener from taking actions until handshake is completed #[derive(Clone, Debug)]
#[async_trait::async_trait] enum TransfersOpEventTypes {
impl<A: Attestor> WebSocketHandler for TransfersService<A> { Query,
async fn handle(&self, event: Event, config: WsListenerConfig) -> Result<()> { Transfer,
// Validation }
let is_transfer = is_transfer_event(&event);
let is_query = is_query_event(&event);
if !is_transfer && !is_query { impl TryFrom<Event> for TransfersOpEvent {
return Ok(()); type Error = Error;
} else {
let mut sender = None;
let mut contract_address = None;
let mut emphemeral_pubkey = None;
fn try_from(event: Event) -> Result<Self, Error> {
if let Some(events) = &event.events { if let Some(events) = &event.events {
for (key, values) in events { for (key, _) in events {
match key.as_str() { match key.as_str() {
"message.sender" => { k if k.starts_with("wasm-query_balance") => {
sender = values.first().cloned(); let (contract_address, ephemeral_pubkey, sender) =
extract_event_info(TransfersOpEventTypes::Query, &events)
.map_err(|_| anyhow!("Failed to extract event info from query event"))?;
return Ok(TransfersOpEvent::Query {
contract_address,
ephemeral_pubkey: ephemeral_pubkey.ok_or(anyhow!("Missing ephemeral_pubkey"))?,
sender: sender.ok_or(anyhow!("Missing sender"))?,
});
} }
"execute._contract_address" => { k if k.starts_with("wasm-transfer.action") => {
contract_address = values.first().cloned(); let (contract_address, _, _) =
} extract_event_info(TransfersOpEventTypes::Transfer, &events)
"wasm-query_balance.emphemeral_pubkey" => { .map_err(|_| anyhow!("Failed to extract event info from transfer event"))?;
// TODO: fix typo
emphemeral_pubkey = values.first().cloned(); return Ok(TransfersOpEvent::Transfer { contract_address });
} }
_ => {} _ => {}
} }
} }
} }
if contract_address.is_none() { Err(anyhow!("Unsupported event."))
return Ok(());
} }
}
if is_transfer { // TODO: Need to prevent listener from taking actions until handshake is completed
println!("Processing transfer event"); #[async_trait::async_trait]
transfer_handler( impl<A: Attestor + Clone> WebSocketHandler for TransfersService<A> {
self, async fn handle(&self, event: Event, config: WsListenerConfig) -> Result<()> {
&contract_address let op_event = TransfersOpEvent::try_from(event)?;
.expect("must be included in transfers event")
.parse::<AccountId>() self.queue_producer
.map_err(|e| anyhow!(e))?, .send(TransfersOp {
&config, client: self.clone(),
) event: op_event,
config,
})
.await?; .await?;
} else if is_query {
println!("Processing query event");
query_handler(
self,
&contract_address
.expect("must be included in query event")
.parse::<AccountId>()
.map_err(|e| anyhow!(e))?,
sender.expect("must be included in query event"),
emphemeral_pubkey.expect("must be included in query event"),
&config,
)
.await?;
}
}
Ok(()) Ok(())
} }
} }
fn is_transfer_event(event: &Event) -> bool { #[tonic::async_trait]
// Check if the event is a transaction type pub trait WsListener: Send + Sync + 'static {
if let Some(EventType::Tx) = event.event_type() { async fn process(&self, event: TransfersOpEvent, config: WsListenerConfig) -> Result<()>;
// Check for the "wasm.action" key with the value "init_clearing"
if let Some(events) = &event.events {
return events.iter().any(|(key, _)| key == "wasm-transfer.action");
}
}
false
} }
fn is_query_event(event: &Event) -> bool { #[async_trait::async_trait]
// Check if the event is a transaction type impl<A: Attestor> WsListener for TransfersService<A> {
if let Some(EventType::Tx) = event.event_type() { async fn process(&self, event: TransfersOpEvent, config: WsListenerConfig) -> Result<()> {
// Check for the "wasm.action" key with the value "init_clearing" match event {
if let Some(events) = &event.events { TransfersOpEvent::Transfer { contract_address } => {
return events println!("Processing transfer event");
.iter() transfer_handler(self, &contract_address, &config).await?;
.any(|(key, _)| key.starts_with("wasm-query_balance")); }
TransfersOpEvent::Query {
contract_address,
ephemeral_pubkey,
sender,
} => {
println!("Processing query event");
query_handler(self, &contract_address, &sender, &ephemeral_pubkey, &config).await?;
} }
} }
false
let wsurl = format!("ws://{}/websocket", config.node_url);
// Wait some blocks to make sure transaction was confirmed
two_block_waitoor(&wsurl).await?;
Ok(())
}
}
fn extract_event_info(
op_event: TransfersOpEventTypes,
events: &BTreeMap<String, Vec<String>>,
) -> Result<(AccountId, Option<String>, Option<String>)> {
let mut sender = None;
let mut ephemeral_pubkey = None;
// Set common info data for all events
let contract_address = events
.get("execute._contract_address")
.ok_or_else(|| anyhow!("Missing execute._contract_address in events"))?
.first()
.ok_or_else(|| anyhow!("execute._contract_address is empty"))?
.parse::<AccountId>()
.map_err(|e| anyhow!("Failed to parse contract address: {}", e))?;
// Set info for specific events
match op_event {
TransfersOpEventTypes::Query => {
sender = events
.get("message.sender")
.ok_or_else(|| anyhow!("Missing message.sender in events"))?
.first()
.cloned();
ephemeral_pubkey = events
.get("wasm-query_balance.emphemeral_pubkey")
.ok_or_else(|| anyhow!("Missing wasm-query_balance.emphemeral_pubkey in events"))?
.first()
.cloned();
}
_ => {}
}
Ok((contract_address, ephemeral_pubkey, sender))
} }
async fn transfer_handler<A: Attestor>( async fn transfer_handler<A: Attestor>(
@ -133,16 +170,15 @@ async fn transfer_handler<A: Attestor>(
let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?; let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?;
let wasmd_client = CliWasmdClient::new(httpurl.clone()); let wasmd_client = CliWasmdClient::new(httpurl.clone());
// Query chain // Query contract state
// Get epoch, obligations, liquidity sources
let resp: QueryResult<Vec<TransferRequest>> = wasmd_client let resp: QueryResult<Vec<TransferRequest>> = wasmd_client
.query_smart(contract, json!(GetRequests {})) .query_smart(contract, json!(GetRequests {}))
.map_err(|e| anyhow!("Problem querying epoch: {}", e))?; .map_err(|e| anyhow!("Problem querying contract state: {}", e))?;
let requests = resp.data; let requests = resp.data;
let resp: QueryResult<HexBinary> = wasmd_client let resp: QueryResult<HexBinary> = wasmd_client
.query_smart(contract, json!(GetState {})) .query_smart(contract, json!(GetState {}))
.map_err(|e| anyhow!("Problem querying epoch: {}", e))?; .map_err(|e| anyhow!("Problem querying contract state: {}", e))?;
let state = resp.data; let state = resp.data;
// Request body contents // Request body contents
@ -168,7 +204,7 @@ async fn transfer_handler<A: Attestor>(
let proof_output = tokio::task::spawn_blocking(move || { let proof_output = tokio::task::spawn_blocking(move || {
// Create a new runtime inside the blocking thread. // Create a new runtime inside the blocking thread.
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { rt.block_on(async {
prove(prover_config) prove(prover_config)
.await .await
@ -200,7 +236,7 @@ async fn transfer_handler<A: Attestor>(
// Build on-chain response // Build on-chain response
// TODO add non-mock support // TODO add non-mock support
let setoffs_msg = ExecuteMsg::Update::<RawMockAttestation>(AttestedMsg { let transfer_msg = ExecuteMsg::Update::<RawMockAttestation>(AttestedMsg {
msg: RawAttestedMsgSansHandler(attested.msg), msg: RawAttestedMsgSansHandler(attested.msg),
attestation: MockAttestation( attestation: MockAttestation(
attested attested
@ -218,7 +254,7 @@ async fn transfer_handler<A: Attestor>(
chain_id, chain_id,
2000000, 2000000,
&ws_config.tx_sender, &ws_config.tx_sender,
json!(setoffs_msg), json!(transfer_msg),
)?; )?;
println!("Output TX: {}", output); println!("Output TX: {}", output);
@ -228,26 +264,25 @@ async fn transfer_handler<A: Attestor>(
async fn query_handler<A: Attestor>( async fn query_handler<A: Attestor>(
client: &TransfersService<A>, client: &TransfersService<A>,
contract: &AccountId, contract: &AccountId,
msg_sender: String, msg_sender: &String,
pubkey: String, pubkey: &String,
ws_config: &WsListenerConfig, ws_config: &WsListenerConfig,
) -> Result<()> { ) -> Result<()> {
let chain_id = &ChainId::from_str(&ws_config.chain_id)?; let chain_id = &ChainId::from_str(&ws_config.chain_id)?;
let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?; let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?;
let wasmd_client = CliWasmdClient::new(httpurl); let wasmd_client = CliWasmdClient::new(httpurl);
// Query Chain // Query contract state
// Get state
let resp: QueryResult<HexBinary> = wasmd_client let resp: QueryResult<HexBinary> = wasmd_client
.query_smart(contract, json!(GetState {})) .query_smart(contract, json!(GetState {}))
.map_err(|e| anyhow!("Problem querying epoch: {}", e))?; .map_err(|e| anyhow!("Problem querying contract state: {}", e))?;
let state = resp.data; let state = resp.data;
// Build request // Build request
let update_contents = QueryRequestMessage { let update_contents = QueryRequestMessage {
state, state,
address: Addr::unchecked(&msg_sender), // sender comes from TX event, therefore is checked address: Addr::unchecked(msg_sender), // sender comes from TX event, therefore is checked
ephemeral_pubkey: HexBinary::from_hex(&pubkey)?, ephemeral_pubkey: HexBinary::from_hex(pubkey)?,
}; };
// Send QueryRequestMessage to enclave over tonic gRPC client // Send QueryRequestMessage to enclave over tonic gRPC client
@ -268,7 +303,7 @@ async fn query_handler<A: Attestor>(
// Build on-chain response // Build on-chain response
// TODO add non-mock support // TODO add non-mock support
let setoffs_msg = ExecuteMsg::QueryResponse::<RawMockAttestation>(AttestedMsg { let query_msg = ExecuteMsg::QueryResponse::<RawMockAttestation>(AttestedMsg {
msg: RawAttestedMsgSansHandler(attested.msg), msg: RawAttestedMsgSansHandler(attested.msg),
attestation: MockAttestation( attestation: MockAttestation(
attested attested
@ -286,14 +321,14 @@ async fn query_handler<A: Attestor>(
chain_id, chain_id,
2000000, 2000000,
&ws_config.tx_sender, &ws_config.tx_sender,
json!(setoffs_msg), json!(query_msg),
)?; )?;
println!("Output TX: {}", output); println!("Output TX: {}", output);
Ok(()) Ok(())
} }
async fn two_block_waitoor(wsurl: &str) -> Result<(), anyhow::Error> { async fn two_block_waitoor(wsurl: &str) -> Result<(), Error> {
let (client, driver) = WebSocketClient::new(wsurl).await?; let (client, driver) = WebSocketClient::new(wsurl).await?;
let driver_handle = tokio::spawn(async move { driver.run().await }); let driver_handle = tokio::spawn(async move { driver.run().await });

View file

@ -34,9 +34,6 @@ test.describe('Transfers', () => {
.getByRole('button', { name: /cancel/i, includeHidden: false }) .getByRole('button', { name: /cancel/i, includeHidden: false })
.click() .click()
// Check new balance
await page.waitForTimeout(4000)
mainBalance += 20 mainBalance += 20
await test await test
@ -80,8 +77,6 @@ test.describe('Transfers', () => {
.getByRole('button', { name: /cancel/i, includeHidden: false }) .getByRole('button', { name: /cancel/i, includeHidden: false })
.click() .click()
// Check new balance
await page.waitForTimeout(4000)
mainBalance -= 10 mainBalance -= 10
await test await test
.expect(await getBalance({ context, page })) .expect(await getBalance({ context, page }))
@ -124,8 +119,6 @@ test.describe('Transfers', () => {
.getByRole('button', { name: /cancel/i, includeHidden: false }) .getByRole('button', { name: /cancel/i, includeHidden: false })
.click() .click()
// Check new balance
await page.waitForTimeout(4000)
await test.expect(await getBalance({ context, page })).toEqual('$0') await test.expect(await getBalance({ context, page })).toEqual('$0')
}) })
}) })

View file

@ -21,6 +21,12 @@ use serde::Serialize;
use crate::types::Fmspc; use crate::types::Fmspc;
#[cfg(not(feature = "mock-sgx"))]
pub type DefaultAttestor = DcapAttestor;
#[cfg(feature = "mock-sgx")]
pub type DefaultAttestor = MockAttestor;
/// The trait defines the interface for generating attestations from within an enclave. /// The trait defines the interface for generating attestations from within an enclave.
pub trait Attestor: Send + Sync + 'static { pub trait Attestor: Send + Sync + 'static {
type Error: ToString; type Error: ToString;