diff --git a/Cargo.lock b/Cargo.lock index ec6a04651..ca95eab13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,6 +299,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" + [[package]] name = "bincode" version = "1.3.3" @@ -1463,7 +1469,7 @@ dependencies = [ "proc-macro2", "quote", "syn", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -1544,6 +1550,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kaspa-p2p" +version = "0.1.0" +dependencies = [ + "ctrlc", + "futures", + "h2", + "kaspa-core", + "lockfree", + "log", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "uuid 1.2.2", +] + [[package]] name = "kaspa-utils" version = "0.1.0" @@ -1753,6 +1777,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.17" @@ -2013,6 +2046,12 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + [[package]] name = "parking" version = "2.0.0" @@ -2442,6 +2481,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rocksdb" version = "0.19.0" @@ -2531,6 +2585,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -2558,6 +2633,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secp256k1" version = "0.24.1" @@ -2675,6 +2760,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simpa" version = "0.1.0" @@ -2841,6 +2935,12 @@ dependencies = [ "syn", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "strsim" version = "0.10.0" @@ -2988,6 +3088,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "winapi", @@ -3014,6 +3115,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-stream" version = "0.1.11" @@ -3071,7 +3183,9 @@ dependencies = [ "pin-project", "prost", "prost-derive", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tokio-util", "tower", @@ -3227,6 +3341,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "uuid" version = "0.8.2" @@ -3236,6 +3356,16 @@ dependencies = [ "getrandom 0.2.8", ] +[[package]] +name = "uuid" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +dependencies = [ + "getrandom 0.2.8", + "rand 0.8.5", +] + [[package]] name = "value-bag" version = "1.0.0-alpha.9" @@ -3375,6 +3505,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 4a815ca27..2c902f178 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "rpc/core", "rpc/grpc", "mining", + "p2p", ] [workspace.package] diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml new file mode 100644 index 000000000..516213744 --- /dev/null +++ b/p2p/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "kaspa-p2p" +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true + +[lib] +name = "kaspa_p2p_lib" +path = "./src/lib.rs" + +[[bin]] +name = "kaspa_p2p_client" +path = "./src/bin/p2p_client.rs" + +[[bin]] +name = "kaspa_p2p_server" +path = "./src/bin/p2p_server.rs" + +[dependencies] +# project internal deps +kaspa-core.workspace = true +log.workspace = true +# project external deps +futures = { version = "0.3", default-features = false, features = ["alloc"] } +prost = "0.11" +ctrlc = "3.2" +tokio = { version = "1.21.2", features = [ "rt-multi-thread", "macros", "signal" ]} +tokio-stream = { version = "0.1.11", features = ["net"] } +tonic = { version = "0.8.2", features = ["tls", "gzip"] } +h2 = "0.3" +lockfree = "0.5.1" +uuid = { version = "1.2.2", features = ["v4","fast-rng"] } + +[build-dependencies] +tonic-build = { version = "0.8.2", features = ["prost"] } diff --git a/p2p/build.rs b/p2p/build.rs new file mode 100644 index 000000000..3c1051aeb --- /dev/null +++ b/p2p/build.rs @@ -0,0 +1,14 @@ +fn main() { + let iface_files = &["messages.proto", "p2p.proto", "rpc.proto"]; + let dirs = &["./proto"]; + + tonic_build::configure() + .build_server(true) + .build_client(true) + .compile(iface_files, dirs) + .unwrap_or_else(|e| panic!("protobuf compilation failed, error: {e}")); + // recompile protobufs only if any of the proto files changes. + for file in iface_files { + println!("cargo:rerun-if-changed={file}"); + } +} diff --git a/p2p/proto/messages.proto b/p2p/proto/messages.proto new file mode 100644 index 000000000..50be89138 --- /dev/null +++ b/p2p/proto/messages.proto @@ -0,0 +1,151 @@ +syntax = "proto3"; +package protowire; + +option go_package = "github.com/kaspanet/kaspad/protowire"; + +import "p2p.proto"; +import "rpc.proto"; + +message KaspadMessage { + oneof payload { + AddressesMessage addresses = 1; + BlockMessage block = 2; + TransactionMessage transaction = 3; + BlockLocatorMessage blockLocator = 5; + RequestAddressesMessage requestAddresses = 6; + RequestRelayBlocksMessage requestRelayBlocks = 10; + RequestTransactionsMessage requestTransactions = 12; + BlockMessage ibdBlock = 13; + InvRelayBlockMessage invRelayBlock = 14; + InvTransactionsMessage invTransactions = 15; + PingMessage ping = 16; + PongMessage pong = 17; + VerackMessage verack = 19; + VersionMessage version = 20; + TransactionNotFoundMessage transactionNotFound = 21; + RejectMessage reject = 22; + PruningPointUtxoSetChunkMessage pruningPointUtxoSetChunk = 25; + RequestIBDBlocksMessage requestIBDBlocks = 26; + UnexpectedPruningPointMessage unexpectedPruningPoint = 27; + IbdBlockLocatorMessage ibdBlockLocator = 30; + IbdBlockLocatorHighestHashMessage ibdBlockLocatorHighestHash = 31; + RequestNextPruningPointUtxoSetChunkMessage requestNextPruningPointUtxoSetChunk = 33; + DonePruningPointUtxoSetChunksMessage donePruningPointUtxoSetChunks = 34; + IbdBlockLocatorHighestHashNotFoundMessage ibdBlockLocatorHighestHashNotFound = 35; + BlockWithTrustedDataMessage blockWithTrustedData = 36; + DoneBlocksWithTrustedDataMessage doneBlocksWithTrustedData = 37; + RequestPruningPointAndItsAnticoneMessage requestPruningPointAndItsAnticone = 40; + BlockHeadersMessage blockHeaders = 41; + RequestNextHeadersMessage requestNextHeaders = 42; + DoneHeadersMessage DoneHeaders = 43; + RequestPruningPointUTXOSetMessage requestPruningPointUTXOSet = 44; + RequestHeadersMessage requestHeaders = 45; + RequestBlockLocatorMessage requestBlockLocator = 46; + PruningPointsMessage pruningPoints = 47; + RequestPruningPointProofMessage requestPruningPointProof = 48; + PruningPointProofMessage pruningPointProof = 49; + ReadyMessage ready = 50; + BlockWithTrustedDataV4Message blockWithTrustedDataV4 = 51; + TrustedDataMessage trustedData = 52; + RequestIBDChainBlockLocatorMessage requestIBDChainBlockLocator = 53; + IbdChainBlockLocatorMessage ibdChainBlockLocator = 54; + RequestAnticoneMessage requestAnticone = 55; + RequestNextPruningPointAndItsAnticoneBlocksMessage requestNextPruningPointAndItsAnticoneBlocks = 56; + + GetCurrentNetworkRequestMessage getCurrentNetworkRequest = 1001; + GetCurrentNetworkResponseMessage getCurrentNetworkResponse = 1002; + SubmitBlockRequestMessage submitBlockRequest = 1003; + SubmitBlockResponseMessage submitBlockResponse = 1004; + GetBlockTemplateRequestMessage getBlockTemplateRequest = 1005; + GetBlockTemplateResponseMessage getBlockTemplateResponse = 1006; + NotifyBlockAddedRequestMessage notifyBlockAddedRequest = 1007; + NotifyBlockAddedResponseMessage notifyBlockAddedResponse = 1008; + BlockAddedNotificationMessage blockAddedNotification = 1009; + GetPeerAddressesRequestMessage getPeerAddressesRequest = 1010; + GetPeerAddressesResponseMessage getPeerAddressesResponse = 1011; + GetSelectedTipHashRequestMessage getSelectedTipHashRequest = 1012; + GetSelectedTipHashResponseMessage getSelectedTipHashResponse = 1013; + GetMempoolEntryRequestMessage getMempoolEntryRequest = 1014; + GetMempoolEntryResponseMessage getMempoolEntryResponse = 1015; + GetConnectedPeerInfoRequestMessage getConnectedPeerInfoRequest = 1016; + GetConnectedPeerInfoResponseMessage getConnectedPeerInfoResponse = 1017; + AddPeerRequestMessage addPeerRequest = 1018; + AddPeerResponseMessage addPeerResponse = 1019; + SubmitTransactionRequestMessage submitTransactionRequest = 1020; + SubmitTransactionResponseMessage submitTransactionResponse = 1021; + NotifyVirtualSelectedParentChainChangedRequestMessage notifyVirtualSelectedParentChainChangedRequest = 1022; + NotifyVirtualSelectedParentChainChangedResponseMessage notifyVirtualSelectedParentChainChangedResponse = 1023; + VirtualSelectedParentChainChangedNotificationMessage virtualSelectedParentChainChangedNotification = 1024; + GetBlockRequestMessage getBlockRequest = 1025; + GetBlockResponseMessage getBlockResponse = 1026; + GetSubnetworkRequestMessage getSubnetworkRequest = 1027; + GetSubnetworkResponseMessage getSubnetworkResponse = 1028; + GetVirtualSelectedParentChainFromBlockRequestMessage getVirtualSelectedParentChainFromBlockRequest = 1029; + GetVirtualSelectedParentChainFromBlockResponseMessage getVirtualSelectedParentChainFromBlockResponse = 1030; + GetBlocksRequestMessage getBlocksRequest = 1031; + GetBlocksResponseMessage getBlocksResponse = 1032; + GetBlockCountRequestMessage getBlockCountRequest = 1033; + GetBlockCountResponseMessage getBlockCountResponse = 1034; + GetBlockDagInfoRequestMessage getBlockDagInfoRequest = 1035; + GetBlockDagInfoResponseMessage getBlockDagInfoResponse = 1036; + ResolveFinalityConflictRequestMessage resolveFinalityConflictRequest = 1037; + ResolveFinalityConflictResponseMessage resolveFinalityConflictResponse = 1038; + NotifyFinalityConflictsRequestMessage notifyFinalityConflictsRequest = 1039; + NotifyFinalityConflictsResponseMessage notifyFinalityConflictsResponse = 1040; + FinalityConflictNotificationMessage finalityConflictNotification = 1041; + FinalityConflictResolvedNotificationMessage finalityConflictResolvedNotification = 1042; + GetMempoolEntriesRequestMessage getMempoolEntriesRequest = 1043; + GetMempoolEntriesResponseMessage getMempoolEntriesResponse = 1044; + ShutDownRequestMessage shutDownRequest = 1045; + ShutDownResponseMessage shutDownResponse = 1046; + GetHeadersRequestMessage getHeadersRequest = 1047; + GetHeadersResponseMessage getHeadersResponse = 1048; + NotifyUtxosChangedRequestMessage notifyUtxosChangedRequest = 1049; + NotifyUtxosChangedResponseMessage notifyUtxosChangedResponse = 1050; + UtxosChangedNotificationMessage utxosChangedNotification = 1051; + GetUtxosByAddressesRequestMessage getUtxosByAddressesRequest = 1052; + GetUtxosByAddressesResponseMessage getUtxosByAddressesResponse = 1053; + GetVirtualSelectedParentBlueScoreRequestMessage getVirtualSelectedParentBlueScoreRequest = 1054; + GetVirtualSelectedParentBlueScoreResponseMessage getVirtualSelectedParentBlueScoreResponse = 1055; + NotifyVirtualSelectedParentBlueScoreChangedRequestMessage notifyVirtualSelectedParentBlueScoreChangedRequest = 1056; + NotifyVirtualSelectedParentBlueScoreChangedResponseMessage notifyVirtualSelectedParentBlueScoreChangedResponse = 1057; + VirtualSelectedParentBlueScoreChangedNotificationMessage virtualSelectedParentBlueScoreChangedNotification = 1058; + BanRequestMessage banRequest = 1059; + BanResponseMessage banResponse = 1060; + UnbanRequestMessage unbanRequest = 1061; + UnbanResponseMessage unbanResponse = 1062; + GetInfoRequestMessage getInfoRequest = 1063; + GetInfoResponseMessage getInfoResponse = 1064; + StopNotifyingUtxosChangedRequestMessage stopNotifyingUtxosChangedRequest = 1065; + StopNotifyingUtxosChangedResponseMessage stopNotifyingUtxosChangedResponse = 1066; + NotifyPruningPointUTXOSetOverrideRequestMessage notifyPruningPointUTXOSetOverrideRequest = 1067; + NotifyPruningPointUTXOSetOverrideResponseMessage notifyPruningPointUTXOSetOverrideResponse = 1068; + PruningPointUTXOSetOverrideNotificationMessage pruningPointUTXOSetOverrideNotification = 1069; + StopNotifyingPruningPointUTXOSetOverrideRequestMessage stopNotifyingPruningPointUTXOSetOverrideRequest = 1070; + StopNotifyingPruningPointUTXOSetOverrideResponseMessage stopNotifyingPruningPointUTXOSetOverrideResponse = 1071; + EstimateNetworkHashesPerSecondRequestMessage estimateNetworkHashesPerSecondRequest = 1072; + EstimateNetworkHashesPerSecondResponseMessage estimateNetworkHashesPerSecondResponse = 1073; + NotifyVirtualDaaScoreChangedRequestMessage notifyVirtualDaaScoreChangedRequest = 1074; + NotifyVirtualDaaScoreChangedResponseMessage notifyVirtualDaaScoreChangedResponse = 1075; + VirtualDaaScoreChangedNotificationMessage virtualDaaScoreChangedNotification = 1076; + GetBalanceByAddressRequestMessage getBalanceByAddressRequest = 1077; + GetBalanceByAddressResponseMessage getBalanceByAddressResponse = 1078; + GetBalancesByAddressesRequestMessage getBalancesByAddressesRequest = 1079; + GetBalancesByAddressesResponseMessage getBalancesByAddressesResponse = 1080; + NotifyNewBlockTemplateRequestMessage notifyNewBlockTemplateRequest = 1081; + NotifyNewBlockTemplateResponseMessage notifyNewBlockTemplateResponse = 1082; + NewBlockTemplateNotificationMessage newBlockTemplateNotification = 1083; + GetMempoolEntriesByAddressesRequestMessage getMempoolEntriesByAddressesRequest = 1084; + GetMempoolEntriesByAddressesResponseMessage getMempoolEntriesByAddressesResponse = 1085; + GetCoinSupplyRequestMessage getCoinSupplyRequest = 1086; + GetCoinSupplyResponseMessage getCoinSupplyResponse= 1087; + } +} + +service P2P { + rpc MessageStream (stream KaspadMessage) returns (stream KaspadMessage) {} +} + +service RPC { + rpc MessageStream (stream KaspadMessage) returns (stream KaspadMessage) {} +} diff --git a/p2p/proto/p2p.proto b/p2p/proto/p2p.proto new file mode 100644 index 000000000..51b0351db --- /dev/null +++ b/p2p/proto/p2p.proto @@ -0,0 +1,290 @@ +syntax = "proto3"; +package protowire; + +option go_package = "github.com/kaspanet/kaspad/protowire"; + +message RequestAddressesMessage{ + bool includeAllSubnetworks = 1; + SubnetworkId subnetworkId = 2; +} + +message AddressesMessage{ + repeated NetAddress addressList = 1; +} + +message NetAddress{ + int64 timestamp = 1; + bytes ip = 3; + uint32 port = 4; +} + +message SubnetworkId{ + bytes bytes = 1; +} + +message TransactionMessage{ + uint32 version = 1; + repeated TransactionInput inputs = 2; + repeated TransactionOutput outputs = 3; + uint64 lockTime = 4; + SubnetworkId subnetworkId = 5; + uint64 gas = 6; + bytes payload = 8; +} + +message TransactionInput{ + Outpoint previousOutpoint = 1; + bytes signatureScript = 2; + uint64 sequence = 3; + uint32 sigOpCount = 4; +} + +message Outpoint{ + TransactionId transactionId = 1; + uint32 index = 2; +} + +message TransactionId{ + bytes bytes = 1; +} +message ScriptPublicKey { + bytes script = 1; + uint32 version = 2; +} + +message TransactionOutput{ + uint64 value = 1; + ScriptPublicKey scriptPublicKey = 2; +} + +message BlockMessage{ + BlockHeader header = 1; + repeated TransactionMessage transactions = 2; +} + +message BlockHeader{ + uint32 version = 1; + repeated BlockLevelParents parents = 12; + Hash hashMerkleRoot = 3; + Hash acceptedIdMerkleRoot = 4; + Hash utxoCommitment = 5; + int64 timestamp = 6; + uint32 bits = 7; + uint64 nonce = 8; + uint64 daaScore = 9; + bytes blueWork = 10; + Hash pruningPoint = 14; + uint64 blueScore = 13; +} + +message BlockLevelParents { + repeated Hash parentHashes = 1; +} + +message Hash{ + bytes bytes = 1; +} + +message RequestBlockLocatorMessage{ + Hash highHash = 1; + uint32 limit = 2; +} + +message BlockLocatorMessage{ + repeated Hash hashes = 1; +} + +message RequestHeadersMessage{ + Hash lowHash = 1; + Hash highHash = 2; +} + +message RequestNextHeadersMessage{ +} + +message DoneHeadersMessage{ +} + +message RequestRelayBlocksMessage{ + repeated Hash hashes = 1; +} + +message RequestTransactionsMessage { + repeated TransactionId ids = 1; +} + +message TransactionNotFoundMessage{ + TransactionId id = 1; +} + +message InvRelayBlockMessage{ + Hash hash = 1; +} + +message InvTransactionsMessage{ + repeated TransactionId ids = 1; +} + +message PingMessage{ + uint64 nonce = 1; +} + +message PongMessage{ + uint64 nonce = 1; +} + +message VerackMessage{ +} + +message VersionMessage{ + uint32 protocolVersion = 1; + uint64 services = 2; + int64 timestamp = 3; + NetAddress address = 4; + bytes id = 5; + string userAgent = 6; + bool disableRelayTx = 8; + SubnetworkId subnetworkId = 9; + string network = 10; +} + +message RejectMessage{ + string reason = 1; +} + +message RequestPruningPointUTXOSetMessage{ + Hash pruningPointHash = 1; +} + +message PruningPointUtxoSetChunkMessage{ + repeated OutpointAndUtxoEntryPair outpointAndUtxoEntryPairs = 1; +} + +message OutpointAndUtxoEntryPair{ + Outpoint outpoint = 1; + UtxoEntry utxoEntry = 2; +} + +message UtxoEntry { + uint64 amount = 1; + ScriptPublicKey scriptPublicKey = 2; + uint64 blockDaaScore = 3; + bool isCoinbase = 4; +} + +message RequestNextPruningPointUtxoSetChunkMessage { +} + +message DonePruningPointUtxoSetChunksMessage { +} + +message RequestIBDBlocksMessage{ + repeated Hash hashes = 1; +} + +message UnexpectedPruningPointMessage{ +} + +message IbdBlockLocatorMessage { + Hash targetHash = 1; + repeated Hash blockLocatorHashes = 2; +} + +message RequestIBDChainBlockLocatorMessage{ + Hash lowHash = 1; + Hash highHash = 2; +} + +message IbdChainBlockLocatorMessage { + repeated Hash blockLocatorHashes = 1; +} + +message RequestAnticoneMessage{ + Hash blockHash = 1; + Hash contextHash = 2; +} + +message IbdBlockLocatorHighestHashMessage { + Hash highestHash = 1; +} + +message IbdBlockLocatorHighestHashNotFoundMessage { +} + +message BlockHeadersMessage { + repeated BlockHeader blockHeaders = 1; +} + +message RequestPruningPointAndItsAnticoneMessage { +} + +message RequestNextPruningPointAndItsAnticoneBlocksMessage{ +} + +message BlockWithTrustedDataMessage { + BlockMessage block = 1; + uint64 daaScore = 2; + repeated DaaBlock daaWindow = 3; + repeated BlockGhostdagDataHashPair ghostdagData = 4; +} + +message DaaBlock { + BlockMessage block = 3; + GhostdagData ghostdagData = 2; +} + +message DaaBlockV4 { + BlockHeader header = 1; + GhostdagData ghostdagData = 2; +} + +message BlockGhostdagDataHashPair { + Hash hash = 1; + GhostdagData ghostdagData = 2; +} + +message GhostdagData { + uint64 blueScore = 1; + bytes blueWork = 2; + Hash selectedParent = 3; + repeated Hash mergeSetBlues = 4; + repeated Hash mergeSetReds = 5; + repeated BluesAnticoneSizes bluesAnticoneSizes = 6; +} + +message BluesAnticoneSizes { + Hash blueHash = 1; + uint32 anticoneSize = 2; +} + +message DoneBlocksWithTrustedDataMessage { +} + +message PruningPointsMessage { + repeated BlockHeader headers = 1; +} + +message RequestPruningPointProofMessage { +} + +message PruningPointProofMessage { + repeated PruningPointProofHeaderArray headers = 1; +} + +message PruningPointProofHeaderArray { + repeated BlockHeader headers = 1; +} + +message ReadyMessage { +} + +message BlockWithTrustedDataV4Message { + BlockMessage block = 1; + repeated uint64 daaWindowIndices = 2; + repeated uint64 ghostdagDataIndices = 3; +} + +message TrustedDataMessage { + repeated DaaBlockV4 daaWindow = 1; + repeated BlockGhostdagDataHashPair ghostdagData = 2; +} diff --git a/p2p/proto/rpc.proto b/p2p/proto/rpc.proto new file mode 100644 index 000000000..81919c954 --- /dev/null +++ b/p2p/proto/rpc.proto @@ -0,0 +1,723 @@ +// RPC-related types. Request messages, response messages, and dependant types. +// +// Clients are expected to build RequestMessages and wrap them in KaspadMessage. (see messages.proto) +// +// Having received a RequestMessage, (wrapped in a KaspadMessage) the RPC server will respond with a +// ResponseMessage (likewise wrapped in a KaspadMessage) respective to the original RequestMessage. +// +// **IMPORTANT:** This API is a work in progress and is subject to break between versions. +// +syntax = "proto3"; +package protowire; + +option go_package = "github.com/kaspanet/kaspad/protowire"; + +// RPCError represents a generic non-internal error. +// +// Receivers of any ResponseMessage are expected to check whether its error field is not null. +message RPCError{ + string message = 1; +} + +message RpcBlock { + RpcBlockHeader header = 1; + repeated RpcTransaction transactions = 2; + RpcBlockVerboseData verboseData = 3; +} + +message RpcBlockHeader { + uint32 version = 1; + repeated RpcBlockLevelParents parents = 12; + string hashMerkleRoot = 3; + string acceptedIdMerkleRoot = 4; + string utxoCommitment = 5; + int64 timestamp = 6; + uint32 bits = 7; + uint64 nonce = 8; + uint64 daaScore = 9; + string blueWork = 10; + string pruningPoint = 14; + uint64 blueScore = 13; +} + +message RpcBlockLevelParents { + repeated string parentHashes = 1; +} + +message RpcBlockVerboseData{ + string hash = 1; + double difficulty = 11; + string selectedParentHash = 13; + repeated string transactionIds = 14; + bool isHeaderOnly = 15; + uint64 blueScore = 16; + repeated string childrenHashes = 17; + repeated string mergeSetBluesHashes = 18; + repeated string mergeSetRedsHashes = 19; + bool isChainBlock = 20; +} + +message RpcTransaction { + uint32 version = 1; + repeated RpcTransactionInput inputs = 2; + repeated RpcTransactionOutput outputs = 3; + uint64 lockTime = 4; + string subnetworkId = 5; + uint64 gas = 6; + string payload = 8; + RpcTransactionVerboseData verboseData = 9; +} + +message RpcTransactionInput { + RpcOutpoint previousOutpoint = 1; + string signatureScript = 2; + uint64 sequence = 3; + uint32 sigOpCount = 5; + RpcTransactionInputVerboseData verboseData = 4; +} + +message RpcScriptPublicKey { + uint32 version = 1; + string scriptPublicKey = 2; +} + +message RpcTransactionOutput { + uint64 amount = 1; + RpcScriptPublicKey scriptPublicKey = 2; + RpcTransactionOutputVerboseData verboseData = 3; +} + +message RpcOutpoint { + string transactionId = 1; + uint32 index = 2; +} + +message RpcUtxoEntry { + uint64 amount = 1; + RpcScriptPublicKey scriptPublicKey = 2; + uint64 blockDaaScore = 3; + bool isCoinbase = 4; +} + +message RpcTransactionVerboseData{ + string transactionId = 1; + string hash = 2; + uint64 mass = 4; + string blockHash = 12; + uint64 blockTime = 14; +} + +message RpcTransactionInputVerboseData{ +} + +message RpcTransactionOutputVerboseData{ + string scriptPublicKeyType = 5; + string scriptPublicKeyAddress = 6; +} + +// GetCurrentNetworkRequestMessage requests the network kaspad is currently running against. +// +// Possible networks are: Mainnet, Testnet, Simnet, Devnet +message GetCurrentNetworkRequestMessage{ +} + +message GetCurrentNetworkResponseMessage{ + string currentNetwork = 1; + RPCError error = 1000; +} + +// SubmitBlockRequestMessage requests to submit a block into the DAG. +// Blocks are generally expected to have been generated using the getBlockTemplate call. +// +// See: GetBlockTemplateRequestMessage +message SubmitBlockRequestMessage{ + RpcBlock block = 2; + bool allowNonDAABlocks = 3; +} + +message SubmitBlockResponseMessage{ + enum RejectReason { + NONE = 0; + BLOCK_INVALID = 1; + IS_IN_IBD = 2; + } + RejectReason rejectReason = 1; + RPCError error = 1000; +} + +// GetBlockTemplateRequestMessage requests a current block template. +// Callers are expected to solve the block template and submit it using the submitBlock call +// +// See: SubmitBlockRequestMessage +message GetBlockTemplateRequestMessage{ + // Which kaspa address should the coinbase block reward transaction pay into + string payAddress = 1; + string extraData = 2; +} + +message GetBlockTemplateResponseMessage{ + RpcBlock block = 3; + + // Whether kaspad thinks that it's synced. + // Callers are discouraged (but not forbidden) from solving blocks when kaspad is not synced. + // That is because when kaspad isn't in sync with the rest of the network there's a high + // chance the block will never be accepted, thus the solving effort would have been wasted. + bool isSynced = 2; + + RPCError error = 1000; +} + +// NotifyBlockAddedRequestMessage registers this connection for blockAdded notifications. +// +// See: BlockAddedNotificationMessage +message NotifyBlockAddedRequestMessage{ +} + +message NotifyBlockAddedResponseMessage{ + RPCError error = 1000; +} + +// BlockAddedNotificationMessage is sent whenever a blocks has been added (NOT accepted) +// into the DAG. +// +// See: NotifyBlockAddedRequestMessage +message BlockAddedNotificationMessage{ + RpcBlock block = 3; +} + +// GetPeerAddressesRequestMessage requests the list of known kaspad addresses in the +// current network. (mainnet, testnet, etc.) +message GetPeerAddressesRequestMessage{ +} + +message GetPeerAddressesResponseMessage{ + repeated GetPeerAddressesKnownAddressMessage addresses = 1; + repeated GetPeerAddressesKnownAddressMessage bannedAddresses = 2; + RPCError error = 1000; +} + +message GetPeerAddressesKnownAddressMessage { + string Addr = 1; +} + +// GetSelectedTipHashRequestMessage requests the hash of the current virtual's +// selected parent. +message GetSelectedTipHashRequestMessage{ +} + +message GetSelectedTipHashResponseMessage{ + string selectedTipHash = 1; + RPCError error = 1000; +} + +// GetMempoolEntryRequestMessage requests information about a specific transaction +// in the mempool. +message GetMempoolEntryRequestMessage{ + // The transaction's TransactionID. + string txId = 1; + bool includeOrphanPool = 2; + bool filterTransactionPool = 3; +} + +message GetMempoolEntryResponseMessage{ + MempoolEntry entry = 1; + + RPCError error = 1000; +} + +// GetMempoolEntriesRequestMessage requests information about all the transactions +// currently in the mempool. +message GetMempoolEntriesRequestMessage{ + bool includeOrphanPool = 1; + bool filterTransactionPool = 2; +} + +message GetMempoolEntriesResponseMessage{ + repeated MempoolEntry entries = 1; + + RPCError error = 1000; +} + +message MempoolEntry{ + uint64 fee = 1; + RpcTransaction transaction = 3; + bool isOrphan = 4; +} + +// GetConnectedPeerInfoRequestMessage requests information about all the p2p peers +// currently connected to this kaspad. +message GetConnectedPeerInfoRequestMessage{ +} + +message GetConnectedPeerInfoResponseMessage{ + repeated GetConnectedPeerInfoMessage infos = 1; + RPCError error = 1000; +} + +message GetConnectedPeerInfoMessage{ + string id = 1; + string address = 2; + + // How long did the last ping/pong exchange take + int64 lastPingDuration = 3; + + // Whether this kaspad initiated the connection + bool isOutbound = 6; + int64 timeOffset = 7; + string userAgent = 8; + + // The protocol version that this peer claims to support + uint32 advertisedProtocolVersion = 9; + + // The timestamp of when this peer connected to this kaspad + int64 timeConnected = 10; + + // Whether this peer is the IBD peer (if IBD is running) + bool isIbdPeer = 11; +} + +// AddPeerRequestMessage adds a peer to kaspad's outgoing connection list. +// This will, in most cases, result in kaspad connecting to said peer. +message AddPeerRequestMessage{ + string address = 1; + + // Whether to keep attempting to connect to this peer after disconnection + bool isPermanent = 2; +} + +message AddPeerResponseMessage{ + RPCError error = 1000; +} + +// SubmitTransactionRequestMessage submits a transaction to the mempool +message SubmitTransactionRequestMessage{ + RpcTransaction transaction = 1; + bool allowOrphan = 2; +} + +message SubmitTransactionResponseMessage{ + // The transaction ID of the submitted transaction + string transactionId = 1; + + RPCError error = 1000; +} + +// NotifyVirtualSelectedParentChainChangedRequestMessage registers this connection for virtualSelectedParentChainChanged notifications. +// +// See: VirtualSelectedParentChainChangedNotificationMessage +message NotifyVirtualSelectedParentChainChangedRequestMessage{ + bool includeAcceptedTransactionIds = 1; +} + +message NotifyVirtualSelectedParentChainChangedResponseMessage{ + RPCError error = 1000; +} + +// VirtualSelectedParentChainChangedNotificationMessage is sent whenever the DAG's selected parent +// chain had changed. +// +// See: NotifyVirtualSelectedParentChainChangedRequestMessage +message VirtualSelectedParentChainChangedNotificationMessage{ + // The chain blocks that were removed, in high-to-low order + repeated string removedChainBlockHashes = 1; + + // The chain blocks that were added, in low-to-high order + repeated string addedChainBlockHashes = 3; + + // Will be filled only if `includeAcceptedTransactionIds = true` in the notify request. + repeated AcceptedTransactionIds acceptedTransactionIds = 2; +} + +// GetBlockRequestMessage requests information about a specific block +message GetBlockRequestMessage{ + // The hash of the requested block + string hash = 1; + + // Whether to include transaction data in the response + bool includeTransactions = 3; +} + +message GetBlockResponseMessage{ + RpcBlock block = 3; + RPCError error = 1000; +} + +// GetSubnetworkRequestMessage requests information about a specific subnetwork +// +// Currently unimplemented +message GetSubnetworkRequestMessage{ + string subnetworkId = 1; +} + +message GetSubnetworkResponseMessage{ + uint64 gasLimit = 1; + RPCError error = 1000; +} + +// GetVirtualSelectedParentChainFromBlockRequestMessage requests the virtual selected +// parent chain from some startHash to this kaspad's current virtual +message GetVirtualSelectedParentChainFromBlockRequestMessage{ + string startHash = 1; + bool includeAcceptedTransactionIds = 2; +} + +message AcceptedTransactionIds{ + string acceptingBlockHash = 1; + repeated string acceptedTransactionIds = 2; +} + +message GetVirtualSelectedParentChainFromBlockResponseMessage{ + // The chain blocks that were removed, in high-to-low order + repeated string removedChainBlockHashes = 1; + + // The chain blocks that were added, in low-to-high order + repeated string addedChainBlockHashes = 3; + + // The transactions accepted by each block in addedChainBlockHashes. + // Will be filled only if `includeAcceptedTransactionIds = true` in the request. + repeated AcceptedTransactionIds acceptedTransactionIds = 2; + + RPCError error = 1000; +} + +// GetBlocksRequestMessage requests blocks between a certain block lowHash up to this +// kaspad's current virtual. +message GetBlocksRequestMessage{ + string lowHash = 1; + bool includeBlocks = 2; + bool includeTransactions = 3; +} + +message GetBlocksResponseMessage{ + repeated string blockHashes = 4; + repeated RpcBlock blocks = 3; + RPCError error = 1000; +} + +// GetBlockCountRequestMessage requests the current number of blocks in this kaspad. +// Note that this number may decrease as pruning occurs. +message GetBlockCountRequestMessage{ +} + +message GetBlockCountResponseMessage{ + uint64 blockCount = 1; + uint64 headerCount = 2; + RPCError error = 1000; +} + +// GetBlockDagInfoRequestMessage requests general information about the current state +// of this kaspad's DAG. +message GetBlockDagInfoRequestMessage{ +} + +message GetBlockDagInfoResponseMessage{ + string networkName = 1; + uint64 blockCount = 2; + uint64 headerCount = 3; + repeated string tipHashes = 4; + double difficulty = 5; + int64 pastMedianTime = 6; + repeated string virtualParentHashes = 7; + string pruningPointHash = 8; + uint64 virtualDaaScore = 9; + RPCError error = 1000; +} + +message ResolveFinalityConflictRequestMessage{ + string finalityBlockHash = 1; +} + +message ResolveFinalityConflictResponseMessage{ + RPCError error = 1000; +} + +message NotifyFinalityConflictsRequestMessage{ +} + +message NotifyFinalityConflictsResponseMessage{ + RPCError error = 1000; +} + +message FinalityConflictNotificationMessage{ + string violatingBlockHash = 1; +} + +message FinalityConflictResolvedNotificationMessage{ + string finalityBlockHash = 1; +} + +// ShutDownRequestMessage shuts down this kaspad. +message ShutDownRequestMessage{ +} + +message ShutDownResponseMessage{ + RPCError error = 1000; +} + +// GetHeadersRequestMessage requests headers between the given startHash and the +// current virtual, up to the given limit. +message GetHeadersRequestMessage{ + string startHash = 1; + uint64 limit = 2; + bool isAscending = 3; +} + +message GetHeadersResponseMessage{ + repeated string headers = 1; + RPCError error = 1000; +} + +// NotifyUtxosChangedRequestMessage registers this connection for utxoChanged notifications +// for the given addresses. +// +// This call is only available when this kaspad was started with `--utxoindex` +// +// See: UtxosChangedNotificationMessage +message NotifyUtxosChangedRequestMessage { + repeated string addresses = 1; // Leave empty to get all updates +} + +message NotifyUtxosChangedResponseMessage { + RPCError error = 1000; +} + +// UtxosChangedNotificationMessage is sent whenever the UTXO index had been updated. +// +// See: NotifyUtxosChangedRequestMessage +message UtxosChangedNotificationMessage { + repeated UtxosByAddressesEntry added = 1; + repeated UtxosByAddressesEntry removed = 2; +} + +message UtxosByAddressesEntry { + string address = 1; + RpcOutpoint outpoint = 2; + RpcUtxoEntry utxoEntry = 3; +} + +// StopNotifyingUtxosChangedRequestMessage unregisters this connection for utxoChanged notifications +// for the given addresses. +// +// This call is only available when this kaspad was started with `--utxoindex` +// +// See: UtxosChangedNotificationMessage +message StopNotifyingUtxosChangedRequestMessage { + repeated string addresses = 1; +} + +message StopNotifyingUtxosChangedResponseMessage { + RPCError error = 1000; +} + +// GetUtxosByAddressesRequestMessage requests all current UTXOs for the given kaspad addresses +// +// This call is only available when this kaspad was started with `--utxoindex` +message GetUtxosByAddressesRequestMessage { + repeated string addresses = 1; +} + +message GetUtxosByAddressesResponseMessage { + repeated UtxosByAddressesEntry entries = 1; + + RPCError error = 1000; +} + +// GetBalanceByAddressRequest returns the total balance in unspent transactions towards a given address +// +// This call is only available when this kaspad was started with `--utxoindex` +message GetBalanceByAddressRequestMessage { + string address = 1; +} + +message GetBalanceByAddressResponseMessage { + uint64 balance = 1; + + RPCError error = 1000; +} + +message GetBalancesByAddressesRequestMessage { + repeated string addresses = 1; +} + +message BalancesByAddressEntry{ + string address = 1; + uint64 balance = 2; + + RPCError error = 1000; +} + +message GetBalancesByAddressesResponseMessage { + repeated BalancesByAddressEntry entries = 1; + + RPCError error = 1000; +} + +// GetVirtualSelectedParentBlueScoreRequestMessage requests the blue score of the current selected parent +// of the virtual block. +message GetVirtualSelectedParentBlueScoreRequestMessage { +} + +message GetVirtualSelectedParentBlueScoreResponseMessage { + uint64 blueScore = 1; + + RPCError error = 1000; +} + +// NotifyVirtualSelectedParentBlueScoreChangedRequestMessage registers this connection for +// virtualSelectedParentBlueScoreChanged notifications. +// +// See: VirtualSelectedParentBlueScoreChangedNotificationMessage +message NotifyVirtualSelectedParentBlueScoreChangedRequestMessage { +} + +message NotifyVirtualSelectedParentBlueScoreChangedResponseMessage { + RPCError error = 1000; +} + +// VirtualSelectedParentBlueScoreChangedNotificationMessage is sent whenever the blue score +// of the virtual's selected parent changes. +// +// See NotifyVirtualSelectedParentBlueScoreChangedRequestMessage +message VirtualSelectedParentBlueScoreChangedNotificationMessage { + uint64 virtualSelectedParentBlueScore = 1; +} + +// NotifyVirtualDaaScoreChangedRequestMessage registers this connection for +// virtualDaaScoreChanged notifications. +// +// See: VirtualDaaScoreChangedNotificationMessage +message NotifyVirtualDaaScoreChangedRequestMessage { +} + +message NotifyVirtualDaaScoreChangedResponseMessage { + RPCError error = 1000; +} + +// VirtualDaaScoreChangedNotificationMessage is sent whenever the DAA score +// of the virtual changes. +// +// See NotifyVirtualDaaScoreChangedRequestMessage +message VirtualDaaScoreChangedNotificationMessage { + uint64 virtualDaaScore = 1; +} + +// NotifyPruningPointUTXOSetOverrideRequestMessage registers this connection for +// pruning point UTXO set override notifications. +// +// This call is only available when this kaspad was started with `--utxoindex` +// +// See: NotifyPruningPointUTXOSetOverrideResponseMessage +message NotifyPruningPointUTXOSetOverrideRequestMessage { +} + + +message NotifyPruningPointUTXOSetOverrideResponseMessage { + RPCError error = 1000; +} + +// PruningPointUTXOSetOverrideNotificationMessage is sent whenever the UTXO index +// resets due to pruning point change via IBD. +// +// See NotifyPruningPointUTXOSetOverrideRequestMessage +message PruningPointUTXOSetOverrideNotificationMessage { +} + +// StopNotifyingPruningPointUTXOSetOverrideRequestMessage unregisters this connection for +// pruning point UTXO set override notifications. +// +// This call is only available when this kaspad was started with `--utxoindex` +// +// See: PruningPointUTXOSetOverrideNotificationMessage +message StopNotifyingPruningPointUTXOSetOverrideRequestMessage { +} + +message StopNotifyingPruningPointUTXOSetOverrideResponseMessage { + RPCError error = 1000; +} + +// BanRequestMessage bans the given ip. +message BanRequestMessage{ + string ip = 1; +} + +message BanResponseMessage{ + RPCError error = 1000; +} + +// UnbanRequestMessage unbans the given ip. +message UnbanRequestMessage{ + string ip = 1; +} + +message UnbanResponseMessage{ + RPCError error = 1000; +} + +// GetInfoRequestMessage returns info about the node. +message GetInfoRequestMessage{ +} + +message GetInfoResponseMessage{ + string p2pId = 1; + uint64 mempoolSize = 2; + string serverVersion = 3; + bool isUtxoIndexed = 4; + bool isSynced = 5; + RPCError error = 1000; +} + +message EstimateNetworkHashesPerSecondRequestMessage{ + uint32 windowSize = 1; + string startHash = 2; +} + +message EstimateNetworkHashesPerSecondResponseMessage{ + uint64 networkHashesPerSecond = 1; + RPCError error = 1000; +} + +// NotifyNewBlockTemplateRequestMessage registers this connection for +// NewBlockTemplate notifications. +// +// See: NewBlockTemplateNotificationMessage +message NotifyNewBlockTemplateRequestMessage { +} + +message NotifyNewBlockTemplateResponseMessage { + RPCError error = 1000; +} + +// NewBlockTemplateNotificationMessage is sent whenever a new updated block template is +// available for miners. +// +// See NotifyNewBlockTemplateRequestMessage +message NewBlockTemplateNotificationMessage { +} + +message MempoolEntryByAddress{ + string address = 1; + repeated MempoolEntry sending = 2; + repeated MempoolEntry receiving = 3; +} + +message GetMempoolEntriesByAddressesRequestMessage{ + repeated string addresses = 1; + bool includeOrphanPool = 2; + bool filterTransactionPool = 3; +} + +message GetMempoolEntriesByAddressesResponseMessage{ + repeated MempoolEntryByAddress entries = 1; + + RPCError error = 1000; +} + +message GetCoinSupplyRequestMessage{ +} + +message GetCoinSupplyResponseMessage{ + uint64 maxSompi = 1; // note: this is a hard coded maxSupply, actual maxSupply is expected to deviate by upto -5%, but cannot be measured exactly. + uint64 circulatingSompi = 2; + + RPCError error = 1000; +} diff --git a/p2p/scripts/wget_kaspad_protobufs.sh b/p2p/scripts/wget_kaspad_protobufs.sh new file mode 100755 index 000000000..140f0999d --- /dev/null +++ b/p2p/scripts/wget_kaspad_protobufs.sh @@ -0,0 +1,3 @@ +wget https://raw.githubusercontent.com/kaspanet/kaspad/master/infrastructure/network/netadapter/server/grpcserver/protowire/messages.proto -P ../proto +wget https://raw.githubusercontent.com/kaspanet/kaspad/master/infrastructure/network/netadapter/server/grpcserver/protowire/p2p.proto -P ../proto +wget https://raw.githubusercontent.com/kaspanet/kaspad/master/infrastructure/network/netadapter/server/grpcserver/protowire/rpc.proto -P ../proto diff --git a/p2p/src/bin/p2p_client.rs b/p2p/src/bin/p2p_client.rs new file mode 100644 index 000000000..363ea4625 --- /dev/null +++ b/p2p/src/bin/p2p_client.rs @@ -0,0 +1,87 @@ +use kaspa_core::debug; +use kaspa_p2p_lib::kaspa_flows::Flow; +use kaspa_p2p_lib::kaspa_grpc; +use kaspa_p2p_lib::kaspa_grpc::RouterApi; +use kaspa_p2p_lib::kaspa_p2p::P2pAdaptorApi; +use kaspa_p2p_lib::{kaspa_flows, pb}; + +#[tokio::main] +async fn main() { + // [-] - init logger + kaspa_core::log::init_logger("trace"); + // [0] - init p2p-adaptor + let p2p_adaptor = kaspa_p2p_lib::kaspa_p2p::P2pAdaptor::init_only_client_side().await.unwrap(); + // [1] - connect 128 peers + flows + let ip_port = String::from("http://[::1]:50051"); + for i in 0..1024 { + debug!("P2P, p2p_client::main - starting peer:{}", i); + let peer_id = p2p_adaptor.connect_peer(ip_port.clone()).await; + let msg = pb::KaspadMessage { payload: Some(pb::kaspad_message::Payload::Verack(pb::VerackMessage {})) }; + p2p_adaptor.send(peer_id.unwrap(), msg).await; + } + // [2] - wait for 60 sec and terminate + std::thread::sleep(std::time::Duration::from_secs(128)); + debug!("P2P,p2p_client::main - TERMINATE"); + p2p_adaptor.terminate_all_peers_and_flows().await; + debug!("P2P,p2p_client::main - FINISH"); +} +#[allow(dead_code)] +async fn old_main_with_impl_details() { + // [-] - init logger + kaspa_core::log::init_logger("trace"); + // [0] - register first instance of router & channel to get new-routers when new connection established + let (router, mut upper_layer_rx) = kaspa_grpc::Router::new().await; + // [1] - Start service layer to listen when new connection is coming ( Server side ) + tokio::spawn(async move { + // loop will exit when all sender channels will be dropped + // --> when all routers will be dropped & grpc-service will be stopped + while let Some(new_router) = upper_layer_rx.recv().await { + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + let flow_terminate = kaspa_flows::EchoFlow::new(new_router).await; + // sleep for 30 sec + std::thread::sleep(std::time::Duration::from_secs(30)); + // terminate when needed + flow_terminate.send(()).unwrap(); + } + }); + // [2] - Start client + re-connect loop + let client = kaspa_grpc::P2pClient::connect_with_retry(String::from("http://[::1]:50051"), router.clone(), false, 16).await; + match client { + Some(connected_client) => { + // [2.*] - send message + let msg = pb::KaspadMessage { payload: Some(pb::kaspad_message::Payload::Verack(pb::VerackMessage {})) }; + let result = connected_client.router.route_to_network(msg).await; + if !result { + panic!("Can't send message!!!"); + } + // sleep for 30 sec + std::thread::sleep(std::time::Duration::from_secs(30)); + // [2.*] - close connection + connected_client.router.as_ref().close().await; + } + None => { + debug!("P2P, Client connection failed - 16 retries ..."); + } + } + /* + let cloned_router_arc = router.clone(); + let mut cnt = 0; + loop { + let client = kaspa_grpc::P2pClient::connect(String::from("http://[::1]:50051"), cloned_router_arc.clone(), false).await; + if client.is_ok() { + println!("Client connected ... we can terminate ..."); + client.unwrap().router.as_ref().close().await; + } else { + println!("{:?}", client.err()); + cnt = cnt + 1; + if cnt > 320 { + println!("Client connected failed - 16 retries ..."); + break; + } else { + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + */ +} diff --git a/p2p/src/bin/p2p_server.rs b/p2p/src/bin/p2p_server.rs new file mode 100644 index 000000000..d50b9aa9f --- /dev/null +++ b/p2p/src/bin/p2p_server.rs @@ -0,0 +1,76 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use kaspa_core::{debug, error}; +use kaspa_p2p_lib::kaspa_flows; +use kaspa_p2p_lib::kaspa_flows::Flow; +use kaspa_p2p_lib::kaspa_grpc; +use kaspa_p2p_lib::kaspa_grpc::RouterApi; +use kaspa_p2p_lib::kaspa_p2p::P2pAdaptorApi; + +#[tokio::main] +async fn main() { + // [-] - init logger + kaspa_core::log::init_logger("trace"); + // [0] - init p2p-adaptor - server side + let ip_port = String::from("[::1]:50051"); + let p2p_adaptor = kaspa_p2p_lib::kaspa_p2p::P2pAdaptor::listen(ip_port.clone()).await.unwrap(); + // [1] - wait for 60 sec & terminate + std::thread::sleep(std::time::Duration::from_secs(128)); + debug!("P2P, p2p_server::main - TERMINATE"); + p2p_adaptor.terminate_all_peers_and_flows().await; + // [2] - drop & sleep 5 sec + drop(p2p_adaptor); + std::thread::sleep(std::time::Duration::from_secs(5)); + debug!("P2P, p2p_server::main - FINISH"); +} +#[allow(dead_code)] +async fn old_main_with_impl_details() { + // [-] - init logger + kaspa_core::log::init_logger("trace"); + // [0] - Create new router - first instance + // upper_layer_rx will be used to dispatch notifications about new-connections, both for client & server + let (router, mut upper_layer_rx) = kaspa_grpc::Router::new().await; + // [1] - Start service layer to listen when new connection is coming ( Server side ) + tokio::spawn(async move { + // loop will exit when all sender channels will be dropped + // --> when all routers will be dropped & grpc-service will be stopped + while let Some(new_router) = upper_layer_rx.recv().await { + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + let flow_terminate = kaspa_flows::EchoFlow::new(new_router).await; + // sleep for 30 sec + std::thread::sleep(std::time::Duration::from_secs(30)); + // terminate when needed (as an example) in general we need to save it somewhere in order to do graceful shutdown + flow_terminate.send(()).unwrap(); + } + }); + // [2] - Start listener (de-facto Server side ) + let terminate_server = kaspa_grpc::P2pServer::listen(String::from("[::1]:50051"), router, true).await; + let terminate_signal = Arc::new(AtomicBool::new(false)); + + // [3] - Check that server is ok & register termination signal ( as an example ) + match terminate_server { + Ok(sender) => { + debug!("P2P, Server is running ... & we can terminate it with CTRL-C"); + let terminate_clone = terminate_signal.clone(); + ctrlc::set_handler(move || { + terminate_clone.store(true, Ordering::SeqCst); + }) + .unwrap(); + // [4] - sleep - just not to exit main function + debug!("P2P, Server-side, endless sleep...."); + loop { + if terminate_signal.load(Ordering::SeqCst) { + debug!("P2P, Received termination signal"); + // terminate grpc service + sender.send(()).unwrap(); + break; + } + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } + Err(err) => { + error!("P2P, Server can't start, {:?}", err); + } + } +} diff --git a/p2p/src/kaspa_flows.rs b/p2p/src/kaspa_flows.rs new file mode 100644 index 000000000..6adabeeba --- /dev/null +++ b/p2p/src/kaspa_flows.rs @@ -0,0 +1,136 @@ +use crate::kaspa_grpc; +use crate::kaspa_grpc::{KaspadMessagePayloadEnumU8, Router, RouterApi}; +use crate::pb; +use kaspa_core::warn; +use log::{debug, trace}; +use std::sync::Arc; +use tonic::async_trait; + +pub type FlowTxTerminateChannelType = tokio::sync::oneshot::Sender<()>; +pub type FlowRxTerminateChannelType = tokio::sync::oneshot::Receiver<()>; + +#[async_trait] +pub trait Flow { + #[allow(clippy::new_ret_no_self)] + async fn new(router: std::sync::Arc) -> FlowTxTerminateChannelType; + async fn call(&self, msg: pb::KaspadMessage) -> bool; +} +#[allow(dead_code)] +pub struct EchoFlow { + receiver: kaspa_grpc::RouterRxChannelType, + router: std::sync::Arc, + terminate: FlowRxTerminateChannelType, +} + +#[async_trait] +impl Flow for EchoFlow { + async fn new(router: std::sync::Arc) -> FlowTxTerminateChannelType { + // [0] - subscribe to messages + trace!("EchoFlow, subscribe to all p2p messages"); + let receiver = router.subscribe_to(vec![ + KaspadMessagePayloadEnumU8::Addresses, + KaspadMessagePayloadEnumU8::Block, + KaspadMessagePayloadEnumU8::Transaction, + KaspadMessagePayloadEnumU8::BlockLocator, + KaspadMessagePayloadEnumU8::RequestAddresses, + KaspadMessagePayloadEnumU8::RequestRelayBlocks, + KaspadMessagePayloadEnumU8::RequestTransactions, + KaspadMessagePayloadEnumU8::IbdBlock, + KaspadMessagePayloadEnumU8::InvRelayBlock, + KaspadMessagePayloadEnumU8::InvTransactions, + KaspadMessagePayloadEnumU8::Ping, + KaspadMessagePayloadEnumU8::Pong, + KaspadMessagePayloadEnumU8::Verack, + KaspadMessagePayloadEnumU8::Version, + KaspadMessagePayloadEnumU8::TransactionNotFound, + KaspadMessagePayloadEnumU8::Reject, + KaspadMessagePayloadEnumU8::PruningPointUtxoSetChunk, + KaspadMessagePayloadEnumU8::RequestIbdBlocks, + KaspadMessagePayloadEnumU8::UnexpectedPruningPoint, + KaspadMessagePayloadEnumU8::IbdBlockLocator, + KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHash, + KaspadMessagePayloadEnumU8::RequestNextPruningPointUtxoSetChunk, + KaspadMessagePayloadEnumU8::DonePruningPointUtxoSetChunks, + KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHashNotFound, + KaspadMessagePayloadEnumU8::BlockWithTrustedData, + KaspadMessagePayloadEnumU8::DoneBlocksWithTrustedData, + KaspadMessagePayloadEnumU8::RequestPruningPointAndItsAnticone, + KaspadMessagePayloadEnumU8::BlockHeaders, + KaspadMessagePayloadEnumU8::RequestNextHeaders, + KaspadMessagePayloadEnumU8::DoneHeaders, + KaspadMessagePayloadEnumU8::RequestPruningPointUtxoSet, + KaspadMessagePayloadEnumU8::RequestHeaders, + KaspadMessagePayloadEnumU8::RequestBlockLocator, + KaspadMessagePayloadEnumU8::PruningPoints, + KaspadMessagePayloadEnumU8::RequestPruningPointProof, + KaspadMessagePayloadEnumU8::PruningPointProof, + KaspadMessagePayloadEnumU8::Ready, + KaspadMessagePayloadEnumU8::BlockWithTrustedDataV4, + KaspadMessagePayloadEnumU8::TrustedData, + KaspadMessagePayloadEnumU8::RequestIbdChainBlockLocator, + KaspadMessagePayloadEnumU8::IbdChainBlockLocator, + KaspadMessagePayloadEnumU8::RequestAnticone, + KaspadMessagePayloadEnumU8::RequestNextPruningPointAndItsAnticoneBlocks, + ]); + // reroute....() + // [1] - close default channel & reroute + // in case we still didn't registered all flows, we will use reroute_to_flow() call + // and only after all flows are registered, reroute_to_flow_and_close_default_route() must be used + trace!("EchoFlow, finilize subscription"); + router.finalize().await; + // [2] - terminate channel + let (term_tx, term_rx) = tokio::sync::oneshot::channel(); + // [3] - create object + let mut echo_flow = EchoFlow { router, receiver, terminate: term_rx }; + // [4] - spawn on echo_flow object + trace!("EchoFlow, start app-layer receiving loop"); + tokio::spawn(async move { + debug!("EchoFlow, start message dispatching loop"); + loop { + tokio::select! { + // [4.0] - receive + Some(msg) = echo_flow.receiver.recv() => { + if !(echo_flow.call(msg).await) { + warn!("EchoFlow, receive loop - call failed"); + break; + } + } + // [4.1] - terminate + _ = &mut echo_flow.terminate => { + debug!("EchoFlow, terminate was requested"); + break; + } + // [4.2] - terminate is recv return error for example + else => { + debug!("EchoFlow - strange case"); + break + } + }; + } + }); + // [5] - return management channel to terminate this flow with term_tx.send(...) + debug!("EchoFlow, returning terminate control to the caller"); + term_tx + } + // this an example `call` to make a point that only inside this call the code starts to be + // maybe not generic + async fn call(&self, msg: pb::KaspadMessage) -> bool { + // echo + trace!("EchoFlow, got message:{:?}", msg); + self.router.route_to_network(msg).await + } +} + +#[async_trait] +pub trait FlowRegistryApi { + async fn initialize_flow(router: std::sync::Arc) -> FlowTxTerminateChannelType; +} + +pub struct FlowRegistry {} + +#[async_trait] +impl FlowRegistryApi for FlowRegistry { + async fn initialize_flow(router: Arc) -> FlowTxTerminateChannelType { + EchoFlow::new(router).await + } +} diff --git a/p2p/src/kaspa_grpc.rs b/p2p/src/kaspa_grpc.rs new file mode 100644 index 000000000..6343b9800 --- /dev/null +++ b/p2p/src/kaspa_grpc.rs @@ -0,0 +1,726 @@ +use crate::pb; +use crate::pb::KaspadMessage; +use fmt::Debug; +use futures::FutureExt; +use kaspa_core::{debug, error, info, trace, warn}; +use lockfree; +use std::fmt; +use std::{error::Error, net::ToSocketAddrs, pin::Pin, result::Result}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tonic::codec::CompressionEncoding; +use tonic::{async_trait, transport::Server, Status}; +use uuid; + +#[allow(dead_code)] +fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { + let mut err: &(dyn Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + // h2::Error do not expose std::io::Error with `source()` + // https://github.com/hyperium/h2/pull/462 + if let Some(h2_err) = err.downcast_ref::() { + if let Some(io_err) = h2_err.get_io() { + return Some(io_err); + } + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} + +#[async_trait] +pub trait RouterApi: Send + Sync + 'static { + // expected to be called once + async fn new() -> (std::sync::Arc, mpsc::Receiver>); + // expected to be called from grpc service (server & client) + async fn clone( + &self, + server_sender: Option>>, + client_sender: Option>, + ) -> std::sync::Arc; + + async fn route_to_flow(&self, msg: pb::KaspadMessage) -> bool; + async fn route_to_network(&self, msg: pb::KaspadMessage) -> bool; + async fn broadcast(&self, msg: pb::KaspadMessage) -> bool; + async fn reroute_to_flow(&self); + async fn finalize(&self); + #[allow(clippy::wrong_self_convention)] + fn from_network_buffer_size(&self) -> usize; + fn to_network_buffer_size(&self) -> usize; + // mapping function + fn grpc_payload_to_internal_u8_enum(payload: &pb::kaspad_message::Payload) -> u8; + // expected to be called by upper layer to register msg-types of interest + fn subscribe_to(&self, msgs: std::vec::Vec) -> mpsc::Receiver; + // Management + async fn close(&self); + // Identity + fn identity(&self) -> uuid::Uuid; +} + +#[derive(Debug)] +pub struct GrpcConnection { + router: std::sync::Arc, +} + +#[tonic::async_trait] +impl pb::p2p_server::P2p for GrpcConnection { + type MessageStreamStream = Pin> + Send + 'static>>; + async fn message_stream( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + trace!("P2P, Server - at new connection"); + // [0] - Create channel for sending messages to the network by upper-layer, p2p layer will use its pop-end + let (push_msg_send_by_us_to_network, pop_msg_send_by_us_to_network) = + mpsc::channel::>(self.router.to_network_buffer_size()); + // [1] - Register & notify ... + let router = self.router.as_ref().clone(Some(push_msg_send_by_us_to_network), None).await; + // [2] - dispatch loop - exits when no-route exists or channel is closed + tokio::spawn(async move { + trace!("P2P, Server at receiving loop start"); + let mut input_from_network_grpc_stream = request.into_inner(); + while let Some(result) = input_from_network_grpc_stream.next().await { + match result { + Ok(msg) => { + trace!("P2P, Server - got message: {:?}, router-id: {}", msg, router.identity()); + // if it is false -> no route for message exists or channel is closed / dropped + if !(router.route_to_flow(msg).await) { + trace!( + "P2P, Server - no route exist for this message, going to close connection, router-id: {}", + router.identity() + ); + router.close().await; + break; + } + } + Err(err) => { + warn!("P2P, server side: got error: {:?}", err); + router.close().await; + break; + } + } + } + }); + // [3] - map stream to be send to the network, dispatch will be handled by grpc + trace!("P2P, Server at final stage of grpc connection registration"); + Ok(tonic::Response::new(Box::pin(ReceiverStream::new(pop_msg_send_by_us_to_network)) as Self::MessageStreamStream)) + } +} + +pub struct P2pServer; + +impl P2pServer { + pub async fn listen( + ip_port: String, + router: std::sync::Arc, + gzip: bool, + ) -> Result, tonic::transport::Error> { + info!("P2P, Start Listener, ip & port: {:?}", ip_port); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + debug!("P2P, Listener starting, ip & port: {:?} [blocking call]....", ip_port); + let grpc_server = match gzip { + true => pb::p2p_server::P2pServer::new(GrpcConnection { router }) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip), + false => pb::p2p_server::P2pServer::new(GrpcConnection { router }), + }; + + Server::builder() + //.add_service(pb::p2p_server::P2pServer::new(GrpcConnection { router })) + .add_service(grpc_server) + .serve_with_shutdown( + ip_port.to_socket_addrs().unwrap().next().unwrap(), + rx.map(drop), + //async { + //let _ = tokio::spawn(tokio::signal::ctrl_c()).await.unwrap(); + //} + ) + .await + .unwrap(); + debug!("P2P, Listener stopped, ip & port: {:?}", ip_port); + }); + Ok(tx) + } +} + +pub struct P2pClient { + grpc_client: Option>, + pub router: std::sync::Arc, +} + +impl P2pClient { + #[allow(dead_code)] + pub async fn connect( + address: String, + router: std::sync::Arc, + gzip: bool, + ) -> std::result::Result { + // [0] - Connection + debug!("P2P, P2pClient::connect - ip & port: {}", address.clone()); + let channel = tonic::transport::Endpoint::new(address.clone())? + .timeout(tokio::time::Duration::from_millis(P2pClient::::communication_timeout())) + .connect_timeout(tokio::time::Duration::from_millis(P2pClient::::connect_timeout())) + .tcp_keepalive(Some(tokio::time::Duration::from_millis(P2pClient::::keep_alive()))) + .connect() + .await?; + + // [1] - channels + let (tx, rx) = mpsc::channel(router.from_network_buffer_size()); + // [2] - Wrapped client + trace!("P2P, P2pClient::connect - grpc client creation ..."); + let mut p2p_client = match gzip { + true => P2pClient { + grpc_client: Some( + // TODO: if new is failed ? + pb::p2p_client::P2pClient::new(channel) + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Gzip), + ), + router: router.as_ref().clone(None, Some(tx)).await, + }, + false => P2pClient { + grpc_client: Some(pb::p2p_client::P2pClient::new(channel)), + router: router.as_ref().clone(None, Some(tx)).await, + }, + }; + // [3] - Read messages from server & route to flows + let mut input_from_network_grpc_stream = + p2p_client.grpc_client.as_mut().unwrap().message_stream(ReceiverStream::new(rx)).await.unwrap().into_inner(); + let router_to_move = p2p_client.router.clone(); + tokio::spawn(async move { + trace!("P2P, P2pClient::connect - receiving loop started"); + // [0] - endless loop + // TODO: check if dropped, is await will return ? + while let Some(result) = input_from_network_grpc_stream.next().await { + match result { + Ok(msg) => { + trace!( + "P2P, P2pClient::connect - client side: got message: {:?}, router-id: {}", + msg, + router_to_move.identity() + ); + // if it is false -> no route for message exists + if !(router_to_move.route_to_flow(msg).await) { + // 1) - no route for this message type exist + // 2) - maybe channel is dropped + debug!("P2P, P2pClient::connect - receiving loop will be stopped, got message that can't be routed, router-id: {}", router_to_move.identity()); + break; + } + } + Err(err) => { + warn!("P2P, P2pClient::connect - receiving loop got error: {:?}", err); + break; + } + } + } + // [1] - close + trace!("P2P, P2pClient::connect - before close"); + router_to_move.close().await; + }); + // [4] - return client handle (wraps grpc-client and router instance) + + Ok(p2p_client) + } + + pub async fn connect_with_retry(address: String, router: std::sync::Arc, gzip: bool, retry: u8) -> Option> { + let mut cnt = 0; + loop { + let client = P2pClient::connect(address.clone(), router.clone(), gzip).await; + if client.is_ok() { + debug!("P2P, Client connected, ip & port: {:?}", address); + return Some(client.unwrap()); + } else { + warn!("{:?}", client.err()); + if cnt > retry { + warn!("P2P, Client connection re-try #{} - all failed", cnt); + return None; + } else { + std::thread::sleep(std::time::Duration::from_secs(2)); + } + cnt += 1; + } + } + } + + #[inline] + fn communication_timeout() -> u64 { + 10_000 + } + #[inline] + fn keep_alive() -> u64 { + 10_000 + } + #[inline] + fn connect_timeout() -> u64 { + 10_000 + } +} + +#[derive(Debug)] + +pub struct Router { + // its lock free in order to accept upper layer to update routing on the fly + routing_map: lockfree::map::Map>, // TODO: wrap in `Option` since master router does not need it + // this is main channel used to send message to the infra layer + server_sender: Option>>, + client_sender: Option>, + // upper layer notification channel + upper_layer_notification: Option>>, + // default routing channels till registration of routes is finished + default_route: (Option>, std::sync::atomic::AtomicBool), + // identity - used for debug + identity: uuid::Uuid, + // broadcast rx + // broadcast_receiver: tokio::sync::broadcast::Receiver, + // broadcast tx + broadcast_sender: tokio::sync::broadcast::Sender, +} + +#[async_trait] +impl RouterApi for Router { + async fn new() -> (std::sync::Arc, mpsc::Receiver>) { + // [0] - ctor + channels + debug!("P2P, Router::new - master router creation"); + // [1] - broadcast tx,rx - new rx created only once for master router + let (b_tx, mut b_rx) = tokio::sync::broadcast::channel(1024); + let (tx, rx) = mpsc::channel(128); + let ret_val = ( + std::sync::Arc::new(Router { + routing_map: lockfree::map::Map::new(), + server_sender: None, + client_sender: None, + upper_layer_notification: Some(tx), + default_route: (None, std::sync::atomic::AtomicBool::new(false)), + identity: uuid::Uuid::new_v4(), + // broadcast_receiver: b_rx, + broadcast_sender: b_tx, + }), + rx, + ); + // [2] - endless loop + // NOTE: this loop does not hold router-arc since if it will hold it, it will never exit ! + // Since this loop only hold Receiver channel it will exit once all Senders are dropped + // This loop does not route messages to the network since master router not connected to anyone + // But we still need to receive messages since in the case of broadcast channel every Receiver + // MUST dispatch his copy of message, otherwise this message will be stacked inside channel + // and special error handling will need to be implemented + let master_router_id = ret_val.0.identity(); + tokio::spawn(async move { + trace!("P2P, master router broadcast loop starting, master-router-id: {:?}", master_router_id); + loop { + match b_rx.recv().await { + Ok(msg) => { + trace!("P2P, master router broadcast loop, message: {:?}", msg); + // result is ignored since master not connected to anyone + } + Err(_err) => { + trace!("P2P, master router broadcast loop shutting down"); + break; + } + } + } + }); + // [3] - result + ret_val + } + + async fn clone( + &self, + server_sender: Option>>, + client_sender: Option>, + ) -> std::sync::Arc { + // [0] - create, TODO: maybe refactor ugly static + unsafe + let router = std::sync::Arc::new(Router { + routing_map: lockfree::map::Map::new(), + server_sender, + client_sender, + upper_layer_notification: Some(self.upper_layer_notification.as_ref().unwrap().clone()), + default_route: (Some(lockfree::queue::Queue::new()), std::sync::atomic::AtomicBool::new(true)), + identity: uuid::Uuid::new_v4(), + // broadcast_receiver: self.broadcast_sender.subscribe(), + broadcast_sender: self.broadcast_sender.clone(), + }); + // [1] - start broadcast loop + // NOTE: this loop will exist or during all sub-system shutdown or when route_to_network will fail + // route_to_network can fail cause: + // 1) disconnection + // 2) router.close() since tx-channel will be downgraded + let same_new_router = router.clone(); + let mut b_rx = self.broadcast_sender.subscribe(); + tokio::spawn(async move { + trace!("P2P, router broadcast loop starting, router-id: {}", same_new_router.identity()); + loop { + let result = b_rx.recv().await; + match result { + Ok(msg) => { + if !(same_new_router.route_to_network(msg).await) { + // it is ok not to warn/error here + trace!("P2P, router broadcast to network loop, unable to route message to network, router-id: {}, will exit broadcast loop",same_new_router.identity()); + break; + } + } + Err(_err) => { + trace!("P2P, router broadcast loop shutting down"); + break; + } + } + } + }); + // [2] - notify upper layer about new connection (TODO: what is upper layer drops all TXs ?? ) + router.upper_layer_notification.as_ref().unwrap().send(router.clone()).await.unwrap(); + // [3] - return shared_ptr + router + } + + async fn route_to_flow(&self, msg: pb::KaspadMessage) -> bool { + // [0] - try to router + let key = Router::grpc_payload_to_internal_u8_enum(msg.payload.as_ref().unwrap()); + match self.routing_map.get(&key) { + // [1] - regular route + Some(send_channel) => send_channel.val().send(msg).await.is_ok(), + None => { + // [2] - try default route if not closed yet + if self.default_route.1.load(std::sync::atomic::Ordering::Relaxed) { + self.default_route.0.as_ref().unwrap().push(msg); + true + } else { + warn!("P2P, Router::route_to_flow - no route for message-type: {:?} exist", key); + false + } + } + } + } + + async fn route_to_network(&self, msg: pb::KaspadMessage) -> bool { + // [0] - first try server-like routing + match &self.server_sender { + Some(sender) => match sender.send(Result::Ok(msg)).await { + Ok(_r) => true, + Err(_e) => false, + }, + // [1] - since server sender in None -> router used for client-like routing + None => match &self.client_sender { + Some(sender) => match sender.send(msg).await { + Ok(_r) => true, + Err(_e) => false, + }, + None => { + // log that can't route since not registered yet + false + } + }, + } + } + + async fn broadcast(&self, msg: KaspadMessage) -> bool { + match self.broadcast_sender.send(msg) { + Ok(_res) => true, + Err(_err) => { + trace!( + "P2P, Router::broadcast - broadcast failed, it can happen during shutdown/initialization, router-id: {}", + self.identity() + ); + false + } + } + } + + async fn reroute_to_flow(&self) { + while let Some(msg) = self.default_route.0.as_ref().unwrap().pop() { + // this should never failed + let _res = self.route_to_flow(msg).await; + } + } + + async fn finalize(&self) { + self.default_route.1.store(false, std::sync::atomic::Ordering::Relaxed); + self.reroute_to_flow().await; + debug!("P2P, Router::finalize - done, router-id: {:?}", self.identity); + } + + fn from_network_buffer_size(&self) -> usize { + 128 + } + + fn to_network_buffer_size(&self) -> usize { + 128 + } + + #[allow(clippy::let_and_return)] + fn grpc_payload_to_internal_u8_enum(payload: &pb::kaspad_message::Payload) -> u8 { + let result = match payload { + pb::kaspad_message::Payload::Addresses(_) => KaspadMessagePayloadEnumU8::Addresses, + pb::kaspad_message::Payload::Block(_) => KaspadMessagePayloadEnumU8::Block, + pb::kaspad_message::Payload::Transaction(_) => KaspadMessagePayloadEnumU8::Transaction, + pb::kaspad_message::Payload::BlockLocator(_) => KaspadMessagePayloadEnumU8::BlockLocator, + pb::kaspad_message::Payload::RequestAddresses(_) => KaspadMessagePayloadEnumU8::RequestAddresses, + pb::kaspad_message::Payload::RequestRelayBlocks(_) => KaspadMessagePayloadEnumU8::RequestRelayBlocks, + pb::kaspad_message::Payload::RequestTransactions(_) => KaspadMessagePayloadEnumU8::RequestTransactions, + pb::kaspad_message::Payload::IbdBlock(_) => KaspadMessagePayloadEnumU8::IbdBlock, + pb::kaspad_message::Payload::InvRelayBlock(_) => KaspadMessagePayloadEnumU8::InvRelayBlock, + pb::kaspad_message::Payload::InvTransactions(_) => KaspadMessagePayloadEnumU8::InvTransactions, + pb::kaspad_message::Payload::Ping(_) => KaspadMessagePayloadEnumU8::Ping, + pb::kaspad_message::Payload::Pong(_) => KaspadMessagePayloadEnumU8::Pong, + pb::kaspad_message::Payload::Verack(_) => KaspadMessagePayloadEnumU8::Verack, + pb::kaspad_message::Payload::Version(_) => KaspadMessagePayloadEnumU8::Version, + pb::kaspad_message::Payload::TransactionNotFound(_) => KaspadMessagePayloadEnumU8::TransactionNotFound, + pb::kaspad_message::Payload::Reject(_) => KaspadMessagePayloadEnumU8::Reject, + pb::kaspad_message::Payload::PruningPointUtxoSetChunk(_) => KaspadMessagePayloadEnumU8::PruningPointUtxoSetChunk, + pb::kaspad_message::Payload::RequestIbdBlocks(_) => KaspadMessagePayloadEnumU8::RequestIbdBlocks, + pb::kaspad_message::Payload::UnexpectedPruningPoint(_) => KaspadMessagePayloadEnumU8::UnexpectedPruningPoint, + pb::kaspad_message::Payload::IbdBlockLocator(_) => KaspadMessagePayloadEnumU8::IbdBlockLocator, + pb::kaspad_message::Payload::IbdBlockLocatorHighestHash(_) => KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHash, + pb::kaspad_message::Payload::RequestNextPruningPointUtxoSetChunk(_) => { + KaspadMessagePayloadEnumU8::RequestNextPruningPointUtxoSetChunk + } + pb::kaspad_message::Payload::DonePruningPointUtxoSetChunks(_) => KaspadMessagePayloadEnumU8::DonePruningPointUtxoSetChunks, + pb::kaspad_message::Payload::IbdBlockLocatorHighestHashNotFound(_) => { + KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHashNotFound + } + pb::kaspad_message::Payload::BlockWithTrustedData(_) => KaspadMessagePayloadEnumU8::BlockWithTrustedData, + pb::kaspad_message::Payload::DoneBlocksWithTrustedData(_) => KaspadMessagePayloadEnumU8::DoneBlocksWithTrustedData, + pb::kaspad_message::Payload::RequestPruningPointAndItsAnticone(_) => { + KaspadMessagePayloadEnumU8::RequestPruningPointAndItsAnticone + } + pb::kaspad_message::Payload::BlockHeaders(_) => KaspadMessagePayloadEnumU8::BlockHeaders, + pb::kaspad_message::Payload::RequestNextHeaders(_) => KaspadMessagePayloadEnumU8::RequestNextHeaders, + pb::kaspad_message::Payload::DoneHeaders(_) => KaspadMessagePayloadEnumU8::DoneHeaders, + pb::kaspad_message::Payload::RequestPruningPointUtxoSet(_) => KaspadMessagePayloadEnumU8::RequestPruningPointUtxoSet, + pb::kaspad_message::Payload::RequestHeaders(_) => KaspadMessagePayloadEnumU8::RequestHeaders, + pb::kaspad_message::Payload::RequestBlockLocator(_) => KaspadMessagePayloadEnumU8::RequestBlockLocator, + pb::kaspad_message::Payload::PruningPoints(_) => KaspadMessagePayloadEnumU8::PruningPoints, + pb::kaspad_message::Payload::RequestPruningPointProof(_) => KaspadMessagePayloadEnumU8::RequestPruningPointProof, + pb::kaspad_message::Payload::PruningPointProof(_) => KaspadMessagePayloadEnumU8::PruningPointProof, + pb::kaspad_message::Payload::Ready(_) => KaspadMessagePayloadEnumU8::Ready, + pb::kaspad_message::Payload::BlockWithTrustedDataV4(_) => KaspadMessagePayloadEnumU8::BlockWithTrustedDataV4, + pb::kaspad_message::Payload::TrustedData(_) => KaspadMessagePayloadEnumU8::TrustedData, + pb::kaspad_message::Payload::RequestIbdChainBlockLocator(_) => KaspadMessagePayloadEnumU8::RequestIbdChainBlockLocator, + pb::kaspad_message::Payload::IbdChainBlockLocator(_) => KaspadMessagePayloadEnumU8::IbdChainBlockLocator, + pb::kaspad_message::Payload::RequestAnticone(_) => KaspadMessagePayloadEnumU8::RequestAnticone, + pb::kaspad_message::Payload::RequestNextPruningPointAndItsAnticoneBlocks(_) => { + KaspadMessagePayloadEnumU8::RequestNextPruningPointAndItsAnticoneBlocks + } + // Default Mapping + _ => KaspadMessagePayloadEnumU8::DefaultMaxValue, + } as u8; + result + } + + fn subscribe_to(&self, msgs: std::vec::Vec) -> mpsc::Receiver { + // [0] - create channels that will be use for new route + let (tx, rx) = mpsc::channel(self.from_network_buffer_size()); + // [1] - update routes + //while let Some(msg_type) = msgs.iter().next() { + for msg_type in msgs { + let msg_id = msg_type as u8; + match self.routing_map.insert(msg_id, tx.clone()) { + Some(prev) => { + // not ok, override existing route + let _msg_id = prev.key(); + error!( + "P2P, Router::subscribe_to override already existing value:{:?}, {}, router-id: {}", + msg_type, _msg_id, self.identity + ); + panic!("Try to subscribe to existing route"); + } + None => { + // everything ok, new route registered + trace!("Router::subscribe_to - msg_id: {} route is registered, router-id:{:?}", msg_id, self.identity()); + } + } + } + // [2] - channel that will be used by upper layer to get messages + rx + } + + async fn close(&self) { + // [0] - remove -> drop + while let Some(tx) = self.routing_map.iter().next() { + self.routing_map.remove(tx.key()); + } + // [1] - how to close ? lets downgrade to weak-ref + match &self.server_sender { + Some(sender) => { + sender.downgrade(); + } + None => { + if let Some(sender) = &self.client_sender { + sender.downgrade(); + } + } + } + // [2] - should we notify upper-layer about closing ? TODO: what if we call `close` twice + self.upper_layer_notification.as_ref().unwrap().downgrade(); + // [3] - TODO: how to close broadcast + // self.broadcast_sender.drop(); + // [3] - debug log + debug!("P2P, Router::close - connection finished, router-id: {:?}", self.identity); + } + + fn identity(&self) -> uuid::Uuid { + self.identity + } +} + +#[repr(u8)] +#[derive(Debug, Copy, Clone)] +pub enum KaspadMessagePayloadEnumU8 { + Addresses = 0, + Block, + Transaction, + BlockLocator, + RequestAddresses, + RequestRelayBlocks, + RequestTransactions, + IbdBlock, + InvRelayBlock, + InvTransactions, + Ping, + Pong, + Verack, + Version, + TransactionNotFound, + Reject, + PruningPointUtxoSetChunk, + RequestIbdBlocks, + UnexpectedPruningPoint, + IbdBlockLocator, + IbdBlockLocatorHighestHash, + RequestNextPruningPointUtxoSetChunk, + DonePruningPointUtxoSetChunks, + IbdBlockLocatorHighestHashNotFound, + BlockWithTrustedData, + DoneBlocksWithTrustedData, + RequestPruningPointAndItsAnticone, + BlockHeaders, + RequestNextHeaders, + DoneHeaders, + RequestPruningPointUtxoSet, + RequestHeaders, + RequestBlockLocator, + PruningPoints, + RequestPruningPointProof, + PruningPointProof, + Ready, + BlockWithTrustedDataV4, + TrustedData, + RequestIbdChainBlockLocator, + IbdChainBlockLocator, + RequestAnticone, + RequestNextPruningPointAndItsAnticoneBlocks, + // Default Max Value + DefaultMaxValue = 0xFF, +} + +pub type RouterRxChannelType = tokio::sync::mpsc::Receiver; + +#[ignore = "not working"] +#[tokio::test] +// this test doesn't work because client does not connect when run from test-exe (to be investigated) +async fn run_p2p_server_and_client_test() -> Result<(), Box> { + // [0] - Create new router - first instance + // upper_layer_rx will be used to dispatch notifications about new-connections, both for client & server + let (router, mut upper_layer_rx) = Router::new().await; + let cloned_router_arc = router.clone(); + // [1] - Start service layer to listen when new connection is coming ( Server side ) + tokio::spawn(async move { + // loop will exit when all sender channels will be dropped + // --> when all routers will be dropped & grpc-service will be stopped + while let Some(new_router) = upper_layer_rx.recv().await { + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + // this channel will be owned by specific flow + let _rx_channel = new_router.subscribe_to(vec![ + KaspadMessagePayloadEnumU8::Addresses, + KaspadMessagePayloadEnumU8::Block, + KaspadMessagePayloadEnumU8::Transaction, + KaspadMessagePayloadEnumU8::BlockLocator, + KaspadMessagePayloadEnumU8::RequestAddresses, + KaspadMessagePayloadEnumU8::RequestRelayBlocks, + KaspadMessagePayloadEnumU8::RequestTransactions, + KaspadMessagePayloadEnumU8::IbdBlock, + KaspadMessagePayloadEnumU8::InvRelayBlock, + KaspadMessagePayloadEnumU8::InvTransactions, + KaspadMessagePayloadEnumU8::Ping, + KaspadMessagePayloadEnumU8::Pong, + KaspadMessagePayloadEnumU8::Verack, + KaspadMessagePayloadEnumU8::Version, + KaspadMessagePayloadEnumU8::TransactionNotFound, + KaspadMessagePayloadEnumU8::Reject, + KaspadMessagePayloadEnumU8::PruningPointUtxoSetChunk, + KaspadMessagePayloadEnumU8::RequestIbdBlocks, + KaspadMessagePayloadEnumU8::UnexpectedPruningPoint, + KaspadMessagePayloadEnumU8::IbdBlockLocator, + KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHash, + KaspadMessagePayloadEnumU8::RequestNextPruningPointUtxoSetChunk, + KaspadMessagePayloadEnumU8::DonePruningPointUtxoSetChunks, + KaspadMessagePayloadEnumU8::IbdBlockLocatorHighestHashNotFound, + KaspadMessagePayloadEnumU8::BlockWithTrustedData, + KaspadMessagePayloadEnumU8::DoneBlocksWithTrustedData, + KaspadMessagePayloadEnumU8::RequestPruningPointAndItsAnticone, + KaspadMessagePayloadEnumU8::BlockHeaders, + KaspadMessagePayloadEnumU8::RequestNextHeaders, + KaspadMessagePayloadEnumU8::DoneHeaders, + KaspadMessagePayloadEnumU8::RequestPruningPointUtxoSet, + KaspadMessagePayloadEnumU8::RequestHeaders, + KaspadMessagePayloadEnumU8::RequestBlockLocator, + KaspadMessagePayloadEnumU8::PruningPoints, + KaspadMessagePayloadEnumU8::RequestPruningPointProof, + KaspadMessagePayloadEnumU8::PruningPointProof, + KaspadMessagePayloadEnumU8::Ready, + KaspadMessagePayloadEnumU8::BlockWithTrustedDataV4, + KaspadMessagePayloadEnumU8::TrustedData, + KaspadMessagePayloadEnumU8::RequestIbdChainBlockLocator, + KaspadMessagePayloadEnumU8::IbdChainBlockLocator, + KaspadMessagePayloadEnumU8::RequestAnticone, + KaspadMessagePayloadEnumU8::RequestNextPruningPointAndItsAnticoneBlocks, + ]); + } + }); + // [2] - Start listener (de-facto Server side ) + let terminate_server = P2pServer::listen(String::from("[::1]:50051"), router, false).await; + + std::thread::sleep(std::time::Duration::from_secs(2)); + + // [3] - Start client + let mut cnt = 0; + loop { + let client = P2pClient::connect(String::from("http://[::1]:50051"), cloned_router_arc.clone(), false).await; + if client.is_ok() { + // client is running, we can register flows + // router.subscribe_to(...) , but in this example spawn @ [1] will do it for every new router + + // terminate client + println!("Client connected ... we can terminate ..."); + client.unwrap().router.as_ref().close().await; + } else { + println!("{:?}", client.err()); + cnt += 1; + if cnt > 16 { + println!("Client connected failed - 16 retries ..."); + break; + } else { + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + std::thread::sleep(std::time::Duration::from_secs(2)); + + // [4] - Check that server is ok + if let Ok(t) = terminate_server { + println!("Server is running ... & we can terminate it"); + t.send(()).unwrap(); + } else { + println!("{:?}", terminate_server.err()); + } + + Ok(()) +} diff --git a/p2p/src/kaspa_p2p.rs b/p2p/src/kaspa_p2p.rs new file mode 100644 index 000000000..98281f962 --- /dev/null +++ b/p2p/src/kaspa_p2p.rs @@ -0,0 +1,254 @@ +use crate::kaspa_flows; +use crate::kaspa_flows::FlowRegistryApi; +use crate::kaspa_grpc; +use crate::kaspa_grpc::RouterApi; +use crate::pb; +use kaspa_core::{debug, error}; +use log::warn; +use std::sync::Arc; +use tonic::async_trait; + +#[allow(dead_code)] +type P2pClientType = kaspa_grpc::P2pClient; + +#[async_trait] +pub trait P2pAdaptorApi { + // will be used only for client side connections (regular kaspa node will NOT use it) + async fn init_only_client_side() -> Option>; + // will start new grpc listener + all infra needed + // 1) start listener + grpc + // 2) start new flows registration loop + // 3) register flows terminate channels + async fn listen(ip_port: String) -> Option>; + // will start new client connection + async fn connect_peer(&self, ip_port: String) -> Option; + // send message to peer - used for tests (regular kaspa node will NOT use it) + async fn send(&self, id: uuid::Uuid, msg: pb::KaspadMessage); + // async fn send2(&self, id: uuid::Uuid, msg: T); + // will terminate everything, but p2p layer + // p2p layer will be terminated during drop(...) + async fn terminate(&self, id: uuid::Uuid); + async fn terminate_all_peers_and_flows(&self); + // helper functions + fn get_all_peer_ids(&self) -> std::vec::Vec; + fn get_all_flow_ids(&self) -> std::vec::Vec; +} + +#[allow(dead_code)] +pub struct P2pAdaptor { + master_router: std::sync::Arc, + flow_termination: lockfree::map::Map, + p2p_termination: Option>, + peers: lockfree::map::Map>, +} + +/* +pub trait ToPayload { + fn to_payload(self) -> pb::kaspad_message::Payload; +} +#[macro_export] +macro_rules! to_payload { + ($message:ident, $payload:ident) => { + impl ToPayload for $message { + fn to_payload(self) -> pb::kaspad_message::Payload { + pb::kaspad_message::Payload::$payload(pb::$message) + } + } + }; +} + +to_payload! { VerackMessage, Verack } +*/ +#[async_trait] +impl P2pAdaptorApi for P2pAdaptor { + async fn init_only_client_side() -> Option> { + // [0] - Create new router - first instance + // upper_layer_rx will be used to dispatch notifications about new-connections, both for client & server + let (master_router, mut upper_layer_rx) = kaspa_grpc::Router::new().await; + // [1] - Create adaptor + let p2p_adaptor = std::sync::Arc::new(P2pAdaptor { + master_router, + flow_termination: lockfree::map::Map::new(), + p2p_termination: None, + peers: lockfree::map::Map::new(), + }); + let p2p_adaptor_clone = p2p_adaptor.clone(); + // [2] - Start service layer to listen when new connection is coming ( Server & Client side ) + tokio::spawn(async move { + // loop will exit when all sender channels will be dropped + // --> when all routers will be dropped & grpc-service will be stopped + while let Some(new_router) = upper_layer_rx.recv().await { + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + let new_router_id = new_router.identity(); + let flow_terminate = kaspa_flows::FlowRegistry::initialize_flow(new_router).await; + let result = p2p_adaptor.flow_termination.insert(new_router_id, flow_terminate); + if result.is_some() { + panic!( + "At flow initialization, insertion into the map - got existing value, flow-key = router-id: {:?}", + result.unwrap().key() + ); + } + } + }); + Some(p2p_adaptor_clone) + } + // regular kaspa node will use this call to have both server & client connections + async fn listen(ip_port: String) -> Option> { + // [0] - Create new router - first instance + // upper_layer_rx will be used to dispatch notifications about new-connections, both for client & server + let (master_router, mut upper_layer_rx) = kaspa_grpc::Router::new().await; + // [1] - Start listener (de-facto Server side ) + let terminate_server = kaspa_grpc::P2pServer::listen(ip_port, master_router.clone(), true).await; + // [2] - Check that server is ok & register termination signal ( as an example ) + if let Ok(t) = terminate_server { + debug!("P2P, Server is running ..."); + let p2p_adaptor = std::sync::Arc::new(P2pAdaptor { + master_router, + flow_termination: lockfree::map::Map::new(), + p2p_termination: Some(t), + peers: lockfree::map::Map::new(), + }); + let p2p_adaptor_clone = p2p_adaptor.clone(); + // [3] - Start service layer to listen when new connection is coming ( Server & Client side ) + tokio::spawn(async move { + // loop will exit when all sender channels will be dropped + // --> when all routers will be dropped & grpc-service will be stopped + while let Some(new_router) = upper_layer_rx.recv().await { + // as en example subscribe to all message-types, in reality different flows will subscribe to different message-types + let new_router_id = new_router.identity(); + let flow_terminate = kaspa_flows::FlowRegistry::initialize_flow(new_router).await; + let result = p2p_adaptor.flow_termination.insert(new_router_id, flow_terminate); + if result.is_some() { + panic!( + "At flow initialization, insertion into the map - got existing value, flow-key = router-id: {:?}", + result.unwrap().key() + ); + } + } + }); + Some(p2p_adaptor_clone) + } else { + error!("P2P, Server can't start, {:?}", terminate_server.err()); + None + } + } + + async fn connect_peer(&self, ip_port: String) -> Option { + // [0] - Start client + re-connect loop + let client = kaspa_grpc::P2pClient::connect_with_retry(ip_port, self.master_router.clone(), false, 16).await; + match client { + Some(connected_client) => { + let peer_id = connected_client.router.identity(); + self.peers.insert(peer_id, connected_client); + Some(peer_id) + } + None => { + debug!("P2P, Client connection failed - 16 retries ..."); + None + } + } + } + + async fn send(&self, id: uuid::Uuid, msg: pb::KaspadMessage) { + match self.peers.get(&id) { + Some(p2p_client) => { + let result = p2p_client.val().router.route_to_network(msg).await; + if !result { + warn!("P2P, P2PAdaptor::send - can't route message to peer-id: {:?}", id); + } + } + None => { + warn!("P2P, P2PAdaptor::send - try to send message to peer that does not exist, peer-id: {:?}", id); + } + } + } + + /* + async fn send2(&self, id: uuid::Uuid, msg: T) { + self.send(id, KaspadMessage { payload: Some(msg.to_payload()) }).await; + } + + */ + + async fn terminate(&self, id: uuid::Uuid) { + match self.peers.remove(&id) { + Some(peer) => { + peer.val().router.close().await; + debug!("P2P, P2pAdaptor::terminate - peer-id: {:?}, is terminated", id); + } + None => { + warn!("P2P, P2pAdaptor::terminate - try to remove unknown peer-id: {:?}", id); + } + } + match self.flow_termination.remove(&id) { + Some(_flow_terminate_channel) => { + //let _ = flow_terminate_channel.val().send(()); + debug!("P2P, P2pAdaptor::terminate - flow-id: {:?}, is terminated", id); + } + None => { + warn!("P2P, P2pAdaptor::terminate - try to remove unknown flow-id: {:?}", id); + } + } + } + + async fn terminate_all_peers_and_flows(&self) { + let peer_ids = self.get_all_peer_ids(); + for peer_id in peer_ids.iter() { + match self.peers.remove(peer_id) { + Some(peer) => { + peer.val().router.close().await; + debug!("P2P, P2pAdaptor::terminate_all_peers_and_flows - peer-id: {:?}, is terminated", peer_id); + } + None => { + warn!("P2P, P2pAdaptor::terminate_all_peers_and_flows - try to remove unknown peer-id: {:?}", peer_id); + } + } + } + let flow_ids = self.get_all_flow_ids(); + for flow_id in flow_ids.iter() { + match self.flow_termination.remove(flow_id) { + Some(_flow_terminate_channel) => { + debug!("P2P, P2pAdaptor::terminate_all_peers_and_flows - flow-id: {:?}, is terminated", flow_id); + } + None => { + warn!("P2P, P2pAdaptor::terminate_all_peers_and_flows - try to remove unknown flow-id: {:?}", flow_id); + } + } + } + // commented but maybe used later + /* + if false == peer_ids.eq(&flow_ids) { + warn!("P2P, P2pAdaptor::terminate_all_peers_and_flows - peers-ids are not equal to flow_ids"); + trace!("P2P, P2pAdaptor::terminate_all_peers_and_flows - peer-ids: {:?}", peer_ids); + trace!("P2P, P2pAdaptor::terminate_all_peers_and_flows - flow-ids: {:?}", flow_ids); + } + */ + } + + fn get_all_peer_ids(&self) -> Vec { + let mut ids = std::vec::Vec::::new(); + for peer in self.peers.iter() { + ids.push(*peer.key()); + } + /* + let mut it = self.peers.iter(); + loop { + match it.next() { + Some(v) => { + ids.push(v.key().clone()); + } + None => break, + } + } + + */ + ids + } + fn get_all_flow_ids(&self) -> Vec { + let mut ids = std::vec::Vec::::new(); + for flow in self.flow_termination.iter() { + ids.push(*flow.key()); + } + ids + } +} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs new file mode 100644 index 000000000..d7c29b878 --- /dev/null +++ b/p2p/src/lib.rs @@ -0,0 +1,7 @@ +pub mod pb { + // this one includes messages.proto + p2p.proto + rcp.proto + tonic::include_proto!("protowire"); +} +pub mod kaspa_flows; +pub mod kaspa_grpc; +pub mod kaspa_p2p;