diff --git a/apps/transfers/enclave/src/main.rs b/apps/transfers/enclave/src/main.rs index 59c0fd6..ccc4f54 100644 --- a/apps/transfers/enclave/src/main.rs +++ b/apps/transfers/enclave/src/main.rs @@ -28,11 +28,13 @@ use cli::Cli; use quartz_common::{ contract::state::{Config, LightClientOpts}, enclave::{ - attestor::{self, Attestor}, + attestor::{self, Attestor, DefaultAttestor}, server::{QuartzServer, WsListenerConfig}, }, }; -use transfers_server::TransfersService; +use transfers_server::{TransfersOp, TransfersService}; +use tokio::sync::mpsc; +use crate::wslistener::WsListener; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { @@ -79,8 +81,19 @@ async fn main() -> Result<(), Box> { let sk = Arc::new(Mutex::new(None)); + // Event queue + let (tx, mut rx) = mpsc::channel::>(1); + // Consumer task: dequeue and process events + tokio::spawn(async move { + while let Some(op) = rx.recv().await { + if let Err(e) = op.client.process(op.event, op.config).await { + println!("Error processing queued event: {}", e); + } + } + }); + QuartzServer::new(config.clone(), sk.clone(), attestor.clone(), ws_config) - .add_service(TransfersService::new(config, sk, attestor)) + .add_service(TransfersService::new(config, sk, attestor, tx)) .serve(args.rpc_addr) .await?; diff --git a/apps/transfers/enclave/src/transfers_server.rs b/apps/transfers/enclave/src/transfers_server.rs index d960b64..47a944e 100644 --- a/apps/transfers/enclave/src/transfers_server.rs +++ b/apps/transfers/enclave/src/transfers_server.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; +use cosmrs::AccountId; use cosmwasm_std::{Addr, HexBinary, Uint128}; use ecies::{decrypt, encrypt}; use k256::ecdsa::{SigningKey, VerifyingKey}; @@ -13,13 +14,14 @@ use quartz_common::{ }, enclave::{ attestor::Attestor, - server::{IntoServer, ProofOfPublication}, + server::{IntoServer, ProofOfPublication, WsListenerConfig}, }, }; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use tonic::{Request, Response, Result as TonicResult, Status}; use transfers_contract::msg::execute::{ClearTextTransferRequestMsg, Request as TransfersRequest}; +use tokio::sync::mpsc::Sender; use crate::{ proto::{ @@ -39,13 +41,6 @@ impl IntoServer for TransfersService { pub type RawCipherText = HexBinary; -#[derive(Clone, Debug)] -pub struct TransfersService { - config: Config, - sk: Arc>>, - attestor: A, -} - #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct UpdateRequestMessage { pub state: HexBinary, @@ -102,15 +97,43 @@ pub struct StatusResponseMessage { encrypted_bal: HexBinary, } +#[derive(Clone, Debug)] +pub enum TransfersOpEvent { + Query { + contract_address: AccountId, + sender: String, + ephemeral_pubkey: String, + }, + Transfer { + contract_address: AccountId, + }, +} + +#[derive(Clone, Debug)] +pub struct TransfersOp { + pub client: TransfersService, + pub event: TransfersOpEvent, + pub config: WsListenerConfig +} + +#[derive(Clone, Debug)] +pub struct TransfersService { + config: Config, + sk: Arc>>, + attestor: A, + pub queue_producer: Sender> +} + impl TransfersService where A: Attestor, { - pub fn new(config: Config, sk: Arc>>, attestor: A) -> Self { + pub fn new(config: Config, sk: Arc>>, attestor: A, queue_producer: Sender>) -> Self { Self { config, sk, attestor, + queue_producer, } } } diff --git a/apps/transfers/enclave/src/wslistener.rs b/apps/transfers/enclave/src/wslistener.rs index f4246ef..5fa9026 100644 --- a/apps/transfers/enclave/src/wslistener.rs +++ b/apps/transfers/enclave/src/wslistener.rs @@ -1,7 +1,7 @@ //TODO: get rid of this -use std::str::FromStr; +use std::{collections::BTreeMap, str::FromStr}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Error, Result}; use cosmrs::{tendermint::chain::Id as ChainId, AccountId}; use cosmwasm_std::{Addr, HexBinary}; use futures_util::StreamExt; @@ -29,99 +29,136 @@ use wasmd_client::{CliWasmdClient, QueryResult, WasmdClient}; use crate::{ proto::{settlement_server::Settlement, QueryRequest, UpdateRequest}, - transfers_server::{QueryRequestMessage, TransfersService, UpdateRequestMessage}, + transfers_server::{ + QueryRequestMessage, TransfersOp, TransfersOpEvent, + TransfersService, UpdateRequestMessage, + }, }; +#[derive(Clone, Debug)] +enum TransfersOpEventTypes { + Query, + Transfer, +} + +impl TryFrom for TransfersOpEvent { + type Error = Error; + + fn try_from(event: Event) -> Result { + if let Some(events) = &event.events { + for (key, _) in events { + match key.as_str() { + k if k.starts_with("wasm-query_balance") => { + let (contract_address, ephemeral_pubkey, sender) = + extract_event_info(TransfersOpEventTypes::Query, &events) + .map_err(|_| anyhow!("Failed to extract event info from query event"))?; + + return Ok(TransfersOpEvent::Query { + contract_address, + ephemeral_pubkey: ephemeral_pubkey.ok_or(anyhow!("Missing ephemeral_pubkey"))?, + sender: sender.ok_or(anyhow!("Missing sender"))?, + }); + } + k if k.starts_with("wasm-transfer.action") => { + let (contract_address, _, _) = + extract_event_info(TransfersOpEventTypes::Transfer, &events) + .map_err(|_| anyhow!("Failed to extract event info from transfer event"))?; + + return Ok(TransfersOpEvent::Transfer { contract_address }); + } + _ => {} + } + } + } + + Err(anyhow!("Unsupported event.")) + } +} + // TODO: Need to prevent listener from taking actions until handshake is completed #[async_trait::async_trait] -impl WebSocketHandler for TransfersService { +impl WebSocketHandler for TransfersService { async fn handle(&self, event: Event, config: WsListenerConfig) -> Result<()> { - // Validation - let is_transfer = is_transfer_event(&event); - let is_query = is_query_event(&event); + let op_event = TransfersOpEvent::try_from(event)?; - if !is_transfer && !is_query { - return Ok(()); - } else { - let mut sender = None; - let mut contract_address = None; - let mut emphemeral_pubkey = None; - - if let Some(events) = &event.events { - for (key, values) in events { - match key.as_str() { - "message.sender" => { - sender = values.first().cloned(); - } - "execute._contract_address" => { - contract_address = values.first().cloned(); - } - "wasm-query_balance.emphemeral_pubkey" => { - // TODO: fix typo - emphemeral_pubkey = values.first().cloned(); - } - _ => {} - } - } - } - - if contract_address.is_none() { - return Ok(()); - } - - if is_transfer { - println!("Processing transfer event"); - transfer_handler( - self, - &contract_address - .expect("must be included in transfers event") - .parse::() - .map_err(|e| anyhow!(e))?, - &config, - ) - .await?; - } else if is_query { - println!("Processing query event"); - query_handler( - self, - &contract_address - .expect("must be included in query event") - .parse::() - .map_err(|e| anyhow!(e))?, - sender.expect("must be included in query event"), - emphemeral_pubkey.expect("must be included in query event"), - &config, - ) - .await?; - } - } + self.queue_producer + .send(TransfersOp { + client: self.clone(), + event: op_event, + config, + }) + .await?; Ok(()) } } -fn is_transfer_event(event: &Event) -> bool { - // Check if the event is a transaction type - if let Some(EventType::Tx) = event.event_type() { - // Check for the "wasm.action" key with the value "init_clearing" - if let Some(events) = &event.events { - return events.iter().any(|(key, _)| key == "wasm-transfer.action"); - } - } - false +#[tonic::async_trait] +pub trait WsListener: Send + Sync + 'static { + async fn process(&self, event: TransfersOpEvent, config: WsListenerConfig) -> Result<()>; } -fn is_query_event(event: &Event) -> bool { - // Check if the event is a transaction type - if let Some(EventType::Tx) = event.event_type() { - // Check for the "wasm.action" key with the value "init_clearing" - if let Some(events) = &event.events { - return events - .iter() - .any(|(key, _)| key.starts_with("wasm-query_balance")); +#[async_trait::async_trait] +impl WsListener for TransfersService { + async fn process(&self, event: TransfersOpEvent, config: WsListenerConfig) -> Result<()> { + match event { + TransfersOpEvent::Transfer { contract_address } => { + println!("Processing transfer event"); + transfer_handler(self, &contract_address, &config).await?; + } + TransfersOpEvent::Query { + contract_address, + ephemeral_pubkey, + sender, + } => { + println!("Processing query event"); + query_handler(self, &contract_address, &sender, &ephemeral_pubkey, &config).await?; + } } + + let wsurl = format!("ws://{}/websocket", config.node_url); + // Wait some blocks to make sure transaction was confirmed + two_block_waitoor(&wsurl).await?; + + Ok(()) } - false +} + +fn extract_event_info( + op_event: TransfersOpEventTypes, + events: &BTreeMap>, +) -> Result<(AccountId, Option, Option)> { + let mut sender = None; + let mut ephemeral_pubkey = None; + + // Set common info data for all events + let contract_address = events + .get("execute._contract_address") + .ok_or_else(|| anyhow!("Missing execute._contract_address in events"))? + .first() + .ok_or_else(|| anyhow!("execute._contract_address is empty"))? + .parse::() + .map_err(|e| anyhow!("Failed to parse contract address: {}", e))?; + + // Set info for specific events + match op_event { + TransfersOpEventTypes::Query => { + sender = events + .get("message.sender") + .ok_or_else(|| anyhow!("Missing message.sender in events"))? + .first() + .cloned(); + + ephemeral_pubkey = events + .get("wasm-query_balance.emphemeral_pubkey") + .ok_or_else(|| anyhow!("Missing wasm-query_balance.emphemeral_pubkey in events"))? + .first() + .cloned(); + } + _ => {} + } + + Ok((contract_address, ephemeral_pubkey, sender)) } async fn transfer_handler( @@ -133,16 +170,15 @@ async fn transfer_handler( let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?; let wasmd_client = CliWasmdClient::new(httpurl.clone()); - // Query chain - // Get epoch, obligations, liquidity sources + // Query contract state let resp: QueryResult> = wasmd_client .query_smart(contract, json!(GetRequests {})) - .map_err(|e| anyhow!("Problem querying epoch: {}", e))?; + .map_err(|e| anyhow!("Problem querying contract state: {}", e))?; let requests = resp.data; let resp: QueryResult = wasmd_client .query_smart(contract, json!(GetState {})) - .map_err(|e| anyhow!("Problem querying epoch: {}", e))?; + .map_err(|e| anyhow!("Problem querying contract state: {}", e))?; let state = resp.data; // Request body contents @@ -168,7 +204,7 @@ async fn transfer_handler( let proof_output = tokio::task::spawn_blocking(move || { // Create a new runtime inside the blocking thread. - let rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new()?; rt.block_on(async { prove(prover_config) .await @@ -200,7 +236,7 @@ async fn transfer_handler( // Build on-chain response // TODO add non-mock support - let setoffs_msg = ExecuteMsg::Update::(AttestedMsg { + let transfer_msg = ExecuteMsg::Update::(AttestedMsg { msg: RawAttestedMsgSansHandler(attested.msg), attestation: MockAttestation( attested @@ -218,7 +254,7 @@ async fn transfer_handler( chain_id, 2000000, &ws_config.tx_sender, - json!(setoffs_msg), + json!(transfer_msg), )?; println!("Output TX: {}", output); @@ -228,26 +264,25 @@ async fn transfer_handler( async fn query_handler( client: &TransfersService, contract: &AccountId, - msg_sender: String, - pubkey: String, + msg_sender: &String, + pubkey: &String, ws_config: &WsListenerConfig, ) -> Result<()> { let chain_id = &ChainId::from_str(&ws_config.chain_id)?; let httpurl = Url::parse(&format!("http://{}", ws_config.node_url))?; let wasmd_client = CliWasmdClient::new(httpurl); - // Query Chain - // Get state + // Query contract state let resp: QueryResult = wasmd_client .query_smart(contract, json!(GetState {})) - .map_err(|e| anyhow!("Problem querying epoch: {}", e))?; + .map_err(|e| anyhow!("Problem querying contract state: {}", e))?; let state = resp.data; // Build request let update_contents = QueryRequestMessage { state, - address: Addr::unchecked(&msg_sender), // sender comes from TX event, therefore is checked - ephemeral_pubkey: HexBinary::from_hex(&pubkey)?, + address: Addr::unchecked(msg_sender), // sender comes from TX event, therefore is checked + ephemeral_pubkey: HexBinary::from_hex(pubkey)?, }; // Send QueryRequestMessage to enclave over tonic gRPC client @@ -268,7 +303,7 @@ async fn query_handler( // Build on-chain response // TODO add non-mock support - let setoffs_msg = ExecuteMsg::QueryResponse::(AttestedMsg { + let query_msg = ExecuteMsg::QueryResponse::(AttestedMsg { msg: RawAttestedMsgSansHandler(attested.msg), attestation: MockAttestation( attested @@ -286,14 +321,14 @@ async fn query_handler( chain_id, 2000000, &ws_config.tx_sender, - json!(setoffs_msg), + json!(query_msg), )?; println!("Output TX: {}", output); Ok(()) } -async fn two_block_waitoor(wsurl: &str) -> Result<(), anyhow::Error> { +async fn two_block_waitoor(wsurl: &str) -> Result<(), Error> { let (client, driver) = WebSocketClient::new(wsurl).await?; let driver_handle = tokio::spawn(async move { driver.run().await }); diff --git a/apps/transfers/frontend/tests/e2e/transfers.spec.ts b/apps/transfers/frontend/tests/e2e/transfers.spec.ts index d603c12..76fdd58 100644 --- a/apps/transfers/frontend/tests/e2e/transfers.spec.ts +++ b/apps/transfers/frontend/tests/e2e/transfers.spec.ts @@ -34,9 +34,6 @@ test.describe('Transfers', () => { .getByRole('button', { name: /cancel/i, includeHidden: false }) .click() - // Check new balance - await page.waitForTimeout(4000) - mainBalance += 20 await test @@ -80,8 +77,6 @@ test.describe('Transfers', () => { .getByRole('button', { name: /cancel/i, includeHidden: false }) .click() - // Check new balance - await page.waitForTimeout(4000) mainBalance -= 10 await test .expect(await getBalance({ context, page })) @@ -124,8 +119,6 @@ test.describe('Transfers', () => { .getByRole('button', { name: /cancel/i, includeHidden: false }) .click() - // Check new balance - await page.waitForTimeout(4000) await test.expect(await getBalance({ context, page })).toEqual('$0') }) }) diff --git a/core/quartz/src/attestor.rs b/core/quartz/src/attestor.rs index e26627f..2c63567 100644 --- a/core/quartz/src/attestor.rs +++ b/core/quartz/src/attestor.rs @@ -21,6 +21,12 @@ use serde::Serialize; use crate::types::Fmspc; +#[cfg(not(feature = "mock-sgx"))] +pub type DefaultAttestor = DcapAttestor; + +#[cfg(feature = "mock-sgx")] +pub type DefaultAttestor = MockAttestor; + /// The trait defines the interface for generating attestations from within an enclave. pub trait Attestor: Send + Sync + 'static { type Error: ToString;