Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
correct PR remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Feb 16, 2024
1 parent a9a4d87 commit 3a1f086
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 214 deletions.
4 changes: 2 additions & 2 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::file::FileData;
use gevulot_node::types::transaction::{ProgramMetadata, TxCreate};
use gevulot_node::types::transaction::{Created, ProgramMetadata};
use gevulot_node::types::Hash;
use gevulot_node::{
rpc_client::RpcClient,
Expand Down Expand Up @@ -203,7 +203,7 @@ pub async fn run_deploy_command(
))
}

async fn send_transaction(client: &RpcClient, tx: &Transaction<TxCreate>) -> Result<Hash, String> {
async fn send_transaction(client: &RpcClient, tx: &Transaction<Created>) -> Result<Hash, String> {
client
.send_transaction(tx)
.await
Expand Down
1 change: 0 additions & 1 deletion crates/node/migrations/20231009111925_tasks-table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ CREATE TABLE file (
task_id uuid NOT NULL,
name VARCHAR(256) NOT NULL,
url VARCHAR(2048) NOT NULL,
checksum VARCHAR(64) NOT NULL,
CONSTRAINT fk_task
FOREIGN KEY (task_id)
REFERENCES task (id) ON DELETE CASCADE
Expand Down
13 changes: 1 addition & 12 deletions crates/node/migrations/20231128120351_transactions-table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,4 @@ CREATE TABLE proof_key (
CONSTRAINT fk_transaction
FOREIGN KEY (tx)
REFERENCES transaction (hash) ON DELETE CASCADE
);


CREATE TABLE txfile (
tx_id VARCHAR(64) NOT NULL,
name VARCHAR(256) NOT NULL,
url VARCHAR(2048) NOT NULL,
checksum VARCHAR(64) NOT NULL,
CONSTRAINT fk_tx
FOREIGN KEY (tx_id)
REFERENCES transaction (hash) ON DELETE CASCADE
);
);
16 changes: 16 additions & 0 deletions crates/node/migrations/20240215174342_add_txfile_and_checksum.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Add checksum to task files
-- Add a new table to store Tx files.

ALTER TABLE file ADD checksum VARCHAR(64) NOT NULL;

DROP TABLE IF EXISTS txfile;

CREATE TABLE txfile (
tx_id VARCHAR(64) NOT NULL,
name VARCHAR(256) NOT NULL,
url VARCHAR(2048) NOT NULL,
checksum VARCHAR(64) NOT NULL,
CONSTRAINT fk_tx
FOREIGN KEY (tx_id)
REFERENCES transaction (hash) ON DELETE CASCADE
);
2 changes: 1 addition & 1 deletion crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub struct P2PBeaconConfig {

#[arg(
long,
long_help = "Port open to download transaction data between nodes. Use P2P interface to bind.",
long_help = "HTTP port for downloading transaction data between nodes. Uses same interface as P2P listen address.",
env = "GEVULOT_HTTP_PORT",
default_value = "9995"
)]
Expand Down
22 changes: 11 additions & 11 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::{Mutex as TMutex, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Server;
use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan, EnvFilter};
use types::{transaction::TxValdiated, Hash, Transaction};
use types::{transaction::Validated, Hash, Transaction};
use workflow::WorkflowEngine;

mod cli;
Expand Down Expand Up @@ -135,11 +135,11 @@ fn generate_node_key(opts: NodeKeyOptions) -> Result<()> {

#[async_trait]
impl mempool::Storage for storage::Database {
async fn get(&self, hash: &Hash) -> Result<Option<Transaction<TxValdiated>>> {
async fn get(&self, hash: &Hash) -> Result<Option<Transaction<Validated>>> {
self.find_transaction(hash).await
}

async fn set(&self, tx: &Transaction<TxValdiated>) -> Result<()> {
async fn set(&self, tx: &Transaction<Validated>) -> Result<()> {
let tx_hash = tx.hash;
self.add_transaction(tx).await?;
self.add_asset(&tx_hash).await?;
Expand All @@ -148,7 +148,7 @@ impl mempool::Storage for storage::Database {

async fn fill_deque(
&self,
deque: &mut std::collections::VecDeque<Transaction<TxValdiated>>,
deque: &mut std::collections::VecDeque<Transaction<Validated>>,
) -> Result<()> {
for t in self.get_unexecuted_transactions().await? {
deque.push_back(t);
Expand All @@ -160,7 +160,7 @@ impl mempool::Storage for storage::Database {

#[async_trait]
impl workflow::TransactionStore for storage::Database {
async fn find_transaction(&self, tx_hash: &Hash) -> Result<Option<Transaction<TxValdiated>>> {
async fn find_transaction(&self, tx_hash: &Hash) -> Result<Option<Transaction<Validated>>> {
self.find_transaction(tx_hash).await
}
}
Expand All @@ -173,8 +173,8 @@ async fn run(config: Arc<Config>) -> Result<()> {

let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));

//start Tx process event loop
let (txevent_loop_jh, tx_sender, p2p_stream) = txvalidation::start_event_loop(
// Start Tx process event loop.
let (txevent_loop_jh, tx_sender, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
Expand Down Expand Up @@ -294,13 +294,13 @@ async fn p2p_beacon(config: P2PBeaconConfig) -> Result<()> {
let http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();

//build empty channel for P2P interface Transaction management.
//Indicate some domain conflict issue.
//P2P network should be started (peer domain) without Tx management (Node domain)
// Build an empty channel for P2P interface's `Transaction` management.
// Indicate some domain conflict issue.
// P2P network should be started (peer domain) without Tx management (Node domain).
let (tx, mut rcv_tx_event_rx) = mpsc::unbounded_channel();
tokio::spawn(async move { while rcv_tx_event_rx.recv().await.is_some() {} });

let (_, p2p_recv) = mpsc::unbounded_channel::<Transaction<TxValdiated>>();
let (_, p2p_recv) = mpsc::unbounded_channel::<Transaction<Validated>>();
let p2p_stream = UnboundedReceiverStream::new(p2p_recv);

let p2p = Arc::new(
Expand Down
16 changes: 8 additions & 8 deletions crates/node/src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::{transaction::TxValdiated, Hash, Transaction};
use crate::types::{transaction::Validated, Hash, Transaction};
use async_trait::async_trait;
use eyre::Result;
use std::collections::VecDeque;
Expand All @@ -7,9 +7,9 @@ use thiserror::Error;

#[async_trait]
pub trait Storage: Send + Sync {
async fn get(&self, hash: &Hash) -> Result<Option<Transaction<TxValdiated>>>;
async fn set(&self, tx: &Transaction<TxValdiated>) -> Result<()>;
async fn fill_deque(&self, deque: &mut VecDeque<Transaction<TxValdiated>>) -> Result<()>;
async fn get(&self, hash: &Hash) -> Result<Option<Transaction<Validated>>>;
async fn set(&self, tx: &Transaction<Validated>) -> Result<()>;
async fn fill_deque(&self, deque: &mut VecDeque<Transaction<Validated>>) -> Result<()>;
}

#[allow(clippy::enum_variant_names)]
Expand All @@ -22,7 +22,7 @@ pub enum MempoolError {
#[derive(Clone)]
pub struct Mempool {
storage: Arc<dyn Storage>,
deque: VecDeque<Transaction<TxValdiated>>,
deque: VecDeque<Transaction<Validated>>,
}

impl Mempool {
Expand All @@ -33,16 +33,16 @@ impl Mempool {
Ok(Self { storage, deque })
}

pub fn next(&mut self) -> Option<Transaction<TxValdiated>> {
pub fn next(&mut self) -> Option<Transaction<Validated>> {
// TODO(tuommaki): Should storage reflect the POP in state?
self.deque.pop_front()
}

pub fn peek(&self) -> Option<&Transaction<TxValdiated>> {
pub fn peek(&self) -> Option<&Transaction<Validated>> {
self.deque.front()
}

pub async fn add(&mut self, tx: Transaction<TxValdiated>) -> Result<()> {
pub async fn add(&mut self, tx: Transaction<Validated>) -> Result<()> {
self.storage.set(&tx).await?;
self.deque.push_back(tx);
Ok(())
Expand Down
40 changes: 20 additions & 20 deletions crates/node/src/networking/p2p/pea2pea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio_stream::StreamExt;
use super::{noise, protocol};
use bytes::{Bytes, BytesMut};
use gevulot_node::types::{
transaction::{TxCreate, TxValdiated},
transaction::{Created, Validated},
Transaction,
};
use parking_lot::RwLock;
Expand All @@ -36,14 +36,14 @@ pub struct P2P {
// This mapping is needed for proper cleanup on OnDisconnect.
peer_addr_mapping: Arc<tokio::sync::RwLock<HashMap<SocketAddr, SocketAddr>>>,
peer_list: Arc<tokio::sync::RwLock<BTreeSet<SocketAddr>>>,
//contains corrected peers use for asset file download.
// Contains corrected peers that are used for asset file download.
pub peer_http_port_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,

http_port: Option<u16>,
nat_listen_addr: Option<SocketAddr>,
psk: Vec<u8>,

//send Tx to the process loop
// Send Tx to the process loop.
tx_sender: TxEventSender<P2pSender>,
}

Expand All @@ -63,7 +63,7 @@ impl P2P {
nat_listen_addr: Option<SocketAddr>,
peer_http_port_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
tx_sender: TxEventSender<P2pSender>,
propagate_tx_stream: impl Stream<Item = Transaction<TxValdiated>> + std::marker::Send + 'static,
propagate_tx_stream: impl Stream<Item = Transaction<Validated>> + std::marker::Send + 'static,
) -> Self {
let config = Config {
name: Some(name.into()),
Expand All @@ -82,7 +82,6 @@ impl P2P {
let instance = Self {
node,
noise_states: Default::default(),
//tx_handler: Arc::new(tokio::sync::RwLock::new(Arc::new(BlackholeTxHandler {}))),
psk: psk.to_vec(),
peer_list: Default::default(),
peer_addr_mapping: Default::default(),
Expand All @@ -98,7 +97,7 @@ impl P2P {
instance.enable_writing().await;
instance.enable_disconnect().await;

//start new Tx stream loop
// Start a new Tx stream loop.
tokio::spawn({
let p2p = instance.clone();
async move {
Expand Down Expand Up @@ -127,7 +126,7 @@ impl P2P {
instance
}

async fn recv_tx(&self, tx: Transaction<TxCreate>) {
async fn forward_tx(&self, tx: Transaction<Created>) {
tracing::debug!("submitting received tx to tx_handler");
if let Err(err) = self.tx_sender.send_tx(tx) {
tracing::error!("P2P error during received Tx sending:{err}");
Expand Down Expand Up @@ -375,17 +374,17 @@ impl Reading for P2P {
tx.hash,
hex::encode(tx.author.serialize())
);
let tx: Transaction<TxCreate> = Transaction {
let tx: Transaction<Created> = Transaction {
author: tx.author,
hash: tx.hash,
payload: tx.payload,
nonce: tx.nonce,
signature: tx.signature,
propagated: tx.propagated,
executed: tx.executed,
state: TxCreate,
state: Created,
};
self.recv_tx(tx).await;
self.forward_tx(tx).await;
}
protocol::MessageV0::DiagnosticsRequest(kind) => {
tracing::debug!("received diagnostics request");
Expand Down Expand Up @@ -431,7 +430,7 @@ mod tests {
use crate::txvalidation::CallbackSender;
use crate::txvalidation::EventProcessError;
use eyre::Result;
use gevulot_node::types::transaction::TxReceive;
use gevulot_node::types::transaction::Received;
use gevulot_node::types::{transaction::Payload, Transaction};
use libsecp256k1::SecretKey;
use rand::{rngs::StdRng, SeedableRng};
Expand All @@ -447,18 +446,18 @@ mod tests {
name: &str,
) -> (
P2P,
UnboundedSender<Transaction<TxValdiated>>,
UnboundedSender<Transaction<Validated>>,
UnboundedReceiver<(
Transaction<TxReceive>,
Transaction<Received>,
Option<oneshot::Sender<Result<(), EventProcessError>>>,
)>,
) {
let http_peer_list1: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();
let (tx_sender, p2p_recv1) = mpsc::unbounded_channel::<Transaction<TxValdiated>>();
let (tx_sender, p2p_recv1) = mpsc::unbounded_channel::<Transaction<Validated>>();
let p2p_stream1 = UnboundedReceiverStream::new(p2p_recv1);
let (sendtx1, txreceiver1) =
mpsc::unbounded_channel::<(Transaction<TxReceive>, Option<CallbackSender>)>();
mpsc::unbounded_channel::<(Transaction<Received>, Option<CallbackSender>)>();
let txsender1 = txvalidation::TxEventSender::<txvalidation::P2pSender>::build(sendtx1);
let peer = P2P::new(
name,
Expand All @@ -474,7 +473,8 @@ mod tests {
(peer, tx_sender, txreceiver1)
}

fn into_receive(tx: Transaction<TxValdiated>) -> Transaction<TxReceive> {
//TODO change by impl From when module declaration between main and lib are solved.
fn into_receive(tx: Transaction<Validated>) -> Transaction<Received> {
Transaction {
author: tx.author,
hash: tx.hash,
Expand All @@ -483,7 +483,7 @@ mod tests {
signature: tx.signature,
propagated: tx.executed,
executed: tx.executed,
state: TxReceive::P2P,
state: Received::P2P,
}
}

Expand Down Expand Up @@ -632,10 +632,10 @@ mod tests {
assert_eq!(into_receive(tx), recv_tx.0);
}

fn new_tx() -> Transaction<TxValdiated> {
fn new_tx() -> Transaction<Validated> {
let rng = &mut StdRng::from_entropy();

let tx = Transaction::<TxCreate>::new(Payload::Empty, &SecretKey::random(rng));
let tx = Transaction::<Created>::new(Payload::Empty, &SecretKey::random(rng));

Transaction {
author: tx.author,
Expand All @@ -645,7 +645,7 @@ mod tests {
signature: tx.signature,
propagated: tx.executed,
executed: tx.executed,
state: TxValdiated,
state: Validated,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/networking/p2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) enum Message {
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) enum MessageV0 {
Transaction(types::Transaction<types::transaction::TxValdiated>),
Transaction(types::Transaction<types::transaction::Validated>),
DiagnosticsRequest(DiagnosticsRequestKind),
DiagnosticsResponse(DiagnosticsResponseV0),
}
Expand Down
8 changes: 4 additions & 4 deletions crates/node/src/rpc_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use jsonrpsee::{

use crate::types::{
rpc::RpcResponse,
transaction::{TransactionTree, TxCreate, TxValdiated},
transaction::{Created, TransactionTree, Validated},
Hash, Transaction,
};

Expand All @@ -26,13 +26,13 @@ impl RpcClient {
pub async fn get_transaction(
&self,
tx_hash: &Hash,
) -> Result<Option<Transaction<TxValdiated>>, Box<dyn Error>> {
) -> Result<Option<Transaction<Validated>>, Box<dyn Error>> {
let mut params = ArrayParams::new();
params.insert(tx_hash).expect("rpc params");

let resp = self
.client
.request::<RpcResponse<Transaction<TxValdiated>>, ArrayParams>("getTransaction", params)
.request::<RpcResponse<Transaction<Validated>>, ArrayParams>("getTransaction", params)
.await
.expect("rpc request");

Expand All @@ -42,7 +42,7 @@ impl RpcClient {
}
}

pub async fn send_transaction(&self, tx: &Transaction<TxCreate>) -> Result<(), Box<dyn Error>> {
pub async fn send_transaction(&self, tx: &Transaction<Created>) -> Result<(), Box<dyn Error>> {
let mut params = ArrayParams::new();
params.insert(tx).expect("rpc params");

Expand Down
Loading

0 comments on commit 3a1f086

Please sign in to comment.