Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/stacks signer block proposal #4130

Merged
merged 27 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
565b424
Seperate stacks node calls from stacker db specific calls
jferrant Dec 5, 2023
f12737d
Add miners stackerdb and update cli
jferrant Dec 5, 2023
6098305
Seperate miner and signer events
jferrant Dec 6, 2023
a10dffa
Add block specific slot and StackerDBMessage type to stackerdb.rs
jferrant Dec 6, 2023
6dee337
Delete stale stacks_client.rs
jferrant Dec 6, 2023
d7e3624
Filter out message types from the signer and miner StackerDBMessages
jferrant Dec 6, 2023
0f680cd
Add serde to StackerDBMessage types
jferrant Dec 6, 2023
57a400a
Add TODO for miner public key verification
jferrant Dec 6, 2023
1f2cdd5
Add copyright to lib.rs and mod.rs files
jferrant Dec 11, 2023
bb06c55
Update put_chunk to take a ref and cleanup clippy
jferrant Dec 11, 2023
868bd3a
Cleanup stacks client to deserialize specific response types isntead …
jferrant Jan 10, 2024
8da428b
is_valid_nakamoto_block should be making a post not a get request
jferrant Jan 10, 2024
17e137a
WIP: add block events to libsigner
jferrant Jan 10, 2024
b921f27
Add block events to libsigner
jferrant Jan 10, 2024
693d876
Add block to BlockValidateReject
jferrant Jan 10, 2024
9bc40c6
Remove use of pox contract and miners contract configs and update test
jferrant Jan 11, 2024
595fb45
Cleanup signer test to easily add another
jferrant Jan 11, 2024
ff49b79
Add test to handle block written to miners stacker db and fix signatu…
jferrant Jan 11, 2024
c45bc09
Add braindumped psuedo code function for extracting block responses f…
jferrant Jan 12, 2024
bef5177
Add braindumped psuedo code function for extracting block responses f…
jferrant Jan 12, 2024
e8d0770
CRC: remove unused EventPrefix and fix copyright year from 2023 to 20…
jferrant Jan 12, 2024
ef116a4
Filter out unknown contract ids from libsigner events
jferrant Jan 12, 2024
6712ac9
Update wsts version to 6.1 to use PartialEq change in Packet
jferrant Jan 12, 2024
01089b6
Add stackerdb_dkg_sign test to CI
jferrant Jan 12, 2024
c870f6e
CRC: add rustdocs to test, cleanup error handling in event.rs, and re…
jferrant Jan 12, 2024
32caa82
Remove unnecessary changes to cli and configs
jferrant Jan 16, 2024
52ccb11
CRC: add copyright to all files, remove commented out code, and move …
jferrant Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ jobs:
- tests::nakamoto_integrations::mine_multiple_per_tenure_integration
- tests::nakamoto_integrations::block_proposal_api_endpoint
- tests::nakamoto_integrations::miner_writes_proposed_block_to_stackerdb
- tests::signer::stackerdb_dkg_sign
- tests::signer::stackerdb_block_proposal
steps:
## Setup test environment
- name: Setup Test Environment
Expand Down
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [

# Dependencies we want to keep the same between workspace members
[workspace.dependencies]
wsts = "6.0"
wsts = "6.1"
rand_core = "0.6"
rand = "0.8"

Expand Down
2 changes: 2 additions & 0 deletions libsigner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ slog = { version = "2.5.2", features = [ "max_level_trace" ] }
slog-term = "2.6.0"
slog-json = { version = "2.3.0", optional = true }
stacks-common = { path = "../stacks-common" }
stackslib = { path = "../stackslib"}
thiserror = "1.0"
tiny_http = "0.12"
wsts = { workspace = true }

[dependencies.serde_json]
version = "1.0"
Expand Down
5 changes: 5 additions & 0 deletions libsigner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use std::io;

use clarity::vm::types::QualifiedContractIdentifier;

/// Errors originating from doing an RPC request to the Stacks node
#[derive(thiserror::Error, Debug)]
pub enum RPCError {
Expand Down Expand Up @@ -66,4 +68,7 @@ pub enum EventError {
/// Unrecognized event error
#[error("Unrecognized event: {0}")]
UnrecognizedEvent(String),
/// Unrecognized stacker DB contract error
#[error("Unrecognized StackerDB contract: {0}")]
UnrecognizedStackerDBContract(QualifiedContractIdentifier),
}
153 changes: 105 additions & 48 deletions libsigner/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,31 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;

use blockstack_lib::chainstate::nakamoto::NakamotoBlock;
use blockstack_lib::chainstate::stacks::boot::MINERS_NAME;
use blockstack_lib::chainstate::stacks::events::StackerDBChunksEvent;
use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse;
use blockstack_lib::util_lib::boot::boot_code_id;
use clarity::vm::types::QualifiedContractIdentifier;
use libstackerdb::StackerDBChunkData;
use serde::{Deserialize, Serialize};
use stacks_common::codec::{
read_next, read_next_at_most, write_next, Error as CodecError, StacksMessageCodec,
};
use tiny_http::{
Method as HttpMethod, Request as HttpRequest, Response as HttpResponse, Server as HttpServer,
};
use wsts::net::{Message, Packet};

use crate::http::{decode_http_body, decode_http_request};
use crate::EventError;

/// Event structure for newly-arrived StackerDB data
/// Event enum for newly-arrived signer subscribed events
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct StackerDBChunksEvent {
/// The contract ID for the StackerDB instance
pub contract_id: QualifiedContractIdentifier,
/// The chunk data for newly-modified slots
pub modified_slots: Vec<StackerDBChunkData>,
pub enum SignerEvent {
/// A new stackerDB chunk was received
StackerDB(StackerDBChunksEvent),
/// A new block proposal was received
BlockProposal(BlockValidateResponse),
}

/// Trait to implement a stop-signaler for the event receiver thread.
Expand All @@ -47,7 +55,7 @@ pub trait EventStopSignaler {
fn send(&mut self);
}

/// Trait to implement to handle StackerDB events sent by the Stacks node
/// Trait to implement to handle StackerDB and BlockProposal events sent by the Stacks node
pub trait EventReceiver {
/// The implementation of ST will ensure that a call to ST::send() will cause
/// the call to `is_stopped()` below to return true.
Expand All @@ -56,11 +64,11 @@ pub trait EventReceiver {
/// Open a server socket to the given socket address.
fn bind(&mut self, listener: SocketAddr) -> Result<SocketAddr, EventError>;
/// Return the next event
fn next_event(&mut self) -> Result<StackerDBChunksEvent, EventError>;
fn next_event(&mut self) -> Result<SignerEvent, EventError>;
/// Add a downstream event consumer
fn add_consumer(&mut self, event_out: Sender<StackerDBChunksEvent>);
fn add_consumer(&mut self, event_out: Sender<SignerEvent>);
/// Forward the event to downstream consumers
fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool;
fn forward_event(&mut self, ev: SignerEvent) -> bool;
/// Determine if the receiver should hang up
fn is_stopped(&self) -> bool;
/// Get a stop signal instance that, when sent, will cause this receiver to stop accepting new
Expand Down Expand Up @@ -100,25 +108,25 @@ pub trait EventReceiver {
}
}

/// Event receiver for StackerDB events
pub struct StackerDBEventReceiver {
/// contracts we're listening for
/// Event receiver for Signer events
pub struct SignerEventReceiver {
/// stacker db contracts we're listening for
pub stackerdb_contract_ids: Vec<QualifiedContractIdentifier>,
/// Address we bind to
local_addr: Option<SocketAddr>,
/// server socket that listens for HTTP POSTs from the node
http_server: Option<HttpServer>,
/// channel into which to write newly-discovered data
out_channels: Vec<Sender<StackerDBChunksEvent>>,
out_channels: Vec<Sender<SignerEvent>>,
/// inter-thread stop variable -- if set to true, then the `main_loop` will exit
stop_signal: Arc<AtomicBool>,
}

impl StackerDBEventReceiver {
/// Make a new StackerDB event receiver, and return both the receiver and the read end of a
impl SignerEventReceiver {
/// Make a new Signer event receiver, and return both the receiver and the read end of a
/// channel into which node-received data can be obtained.
pub fn new(contract_ids: Vec<QualifiedContractIdentifier>) -> StackerDBEventReceiver {
StackerDBEventReceiver {
pub fn new(contract_ids: Vec<QualifiedContractIdentifier>) -> SignerEventReceiver {
SignerEventReceiver {
stackerdb_contract_ids: contract_ids,
http_server: None,
local_addr: None,
Expand All @@ -130,38 +138,38 @@ impl StackerDBEventReceiver {
/// Do something with the socket
pub fn with_server<F, R>(&mut self, todo: F) -> Result<R, EventError>
where
F: FnOnce(&mut StackerDBEventReceiver, &mut HttpServer) -> R,
F: FnOnce(&SignerEventReceiver, &mut HttpServer, &[QualifiedContractIdentifier]) -> R,
{
let mut server = if let Some(s) = self.http_server.take() {
s
} else {
return Err(EventError::NotBound);
};

let res = todo(self, &mut server);
let res = todo(self, &mut server, &self.stackerdb_contract_ids);

self.http_server = Some(server);
Ok(res)
}
}

/// Stop signaler implementation
pub struct StackerDBStopSignaler {
pub struct SignerStopSignaler {
stop_signal: Arc<AtomicBool>,
local_addr: SocketAddr,
}

impl StackerDBStopSignaler {
impl SignerStopSignaler {
/// Make a new stop signaler
pub fn new(sig: Arc<AtomicBool>, local_addr: SocketAddr) -> StackerDBStopSignaler {
StackerDBStopSignaler {
pub fn new(sig: Arc<AtomicBool>, local_addr: SocketAddr) -> SignerStopSignaler {
SignerStopSignaler {
stop_signal: sig,
local_addr,
}
}
}

impl EventStopSignaler for StackerDBStopSignaler {
impl EventStopSignaler for SignerStopSignaler {
fn send(&mut self) {
self.stop_signal.store(true, Ordering::SeqCst);
// wake up the thread so the atomicbool can be checked
Expand All @@ -179,8 +187,8 @@ impl EventStopSignaler for StackerDBStopSignaler {
}
}

impl EventReceiver for StackerDBEventReceiver {
type ST = StackerDBStopSignaler;
impl EventReceiver for SignerEventReceiver {
type ST = SignerStopSignaler;

/// Start listening on the given socket address.
/// Returns the address that was bound.
Expand All @@ -194,8 +202,8 @@ impl EventReceiver for StackerDBEventReceiver {
/// Wait for the node to post something, and then return it.
/// Errors are recoverable -- the caller should call this method again even if it returns an
/// error.
fn next_event(&mut self) -> Result<StackerDBChunksEvent, EventError> {
self.with_server(|event_receiver, http_server| {
fn next_event(&mut self) -> Result<SignerEvent, EventError> {
self.with_server(|event_receiver, http_server, contract_ids| {
let mut request = http_server.recv()?;

// were we asked to terminate?
Expand All @@ -209,27 +217,63 @@ impl EventReceiver for StackerDBEventReceiver {
&request.method(),
)));
}
if request.url() != "/stackerdb_chunks" {
let url = request.url().to_string();
if request.url() == "/stackerdb_chunks" {
debug!("Got stackerdb_chunks event");
let mut body = String::new();
if let Err(e) = request
.as_reader()
.read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);

info!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
request.url()
);
request
.respond(HttpResponse::empty(200u16))
.expect("response failed");
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}

jferrant marked this conversation as resolved.
Show resolved Hide resolved
let event: StackerDBChunksEvent =
serde_json::from_slice(body.as_bytes()).map_err(|e| {
EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e))
})?;

if !contract_ids.contains(&event.contract_id) {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
event.contract_id
);
request
.respond(HttpResponse::empty(200u16))
.expect("response failed");
return Err(EventError::UnrecognizedStackerDBContract(event.contract_id));
}

request
.respond(HttpResponse::empty(200u16))
.expect("response failed");
Err(EventError::UnrecognizedEvent(url))
} else {

Ok(SignerEvent::StackerDB(event))
} else if request.url() == "/proposal_response" {
jferrant marked this conversation as resolved.
Show resolved Hide resolved
debug!("Got proposal_response event");
let mut body = String::new();
request
if let Err(e) = request
.as_reader()
.read_to_string(&mut body)
.expect("failed to read body");
.read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);

let event: StackerDBChunksEvent =
request
.respond(HttpResponse::empty(200u16))
.expect("response failed");
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
)));
}

let event: BlockValidateResponse =
serde_json::from_slice(body.as_bytes()).map_err(|e| {
EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e))
})?;
Expand All @@ -238,7 +282,20 @@ impl EventReceiver for StackerDBEventReceiver {
.respond(HttpResponse::empty(200u16))
.expect("response failed");

Ok(event)
Ok(SignerEvent::BlockProposal(event))
} else {
let url = request.url().to_string();

info!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
request.url()
);

request
.respond(HttpResponse::empty(200u16))
.expect("response failed");
Err(EventError::UnrecognizedEvent(url))
}
})?
}
Expand All @@ -251,7 +308,7 @@ impl EventReceiver for StackerDBEventReceiver {
/// Forward an event
/// Return true on success; false on error.
/// Returning false terminates the event receiver.
fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool {
fn forward_event(&mut self, ev: SignerEvent) -> bool {
if self.out_channels.is_empty() {
// nothing to do
error!("No channels connected to event receiver");
Expand All @@ -275,15 +332,15 @@ impl EventReceiver for StackerDBEventReceiver {
}

/// Add an event consumer. A received event will be forwarded to this Sender.
fn add_consumer(&mut self, out_channel: Sender<StackerDBChunksEvent>) {
fn add_consumer(&mut self, out_channel: Sender<SignerEvent>) {
self.out_channels.push(out_channel);
}

/// Get a stopped signaler. The caller can then use it to terminate the event receiver loop,
/// even if it's in a different thread.
fn get_stop_signaler(&mut self) -> Result<StackerDBStopSignaler, EventError> {
fn get_stop_signaler(&mut self) -> Result<SignerStopSignaler, EventError> {
if let Some(local_addr) = self.local_addr {
Ok(StackerDBStopSignaler::new(
Ok(SignerStopSignaler::new(
self.stop_signal.clone(),
local_addr,
))
Expand Down
3 changes: 1 addition & 2 deletions libsigner/src/libsigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ mod session;

pub use crate::error::{EventError, RPCError};
pub use crate::events::{
EventReceiver, EventStopSignaler, StackerDBChunksEvent, StackerDBEventReceiver,
StackerDBStopSignaler,
EventReceiver, EventStopSignaler, SignerEvent, SignerEventReceiver, SignerStopSignaler,
};
pub use crate::runloop::{RunningSigner, Signer, SignerRunLoop};
pub use crate::session::{SignerSession, StackerDBSession};
Loading