Skip to content

Commit

Permalink
Merge pull request #5001 from systeminit/nick/6eed78c
Browse files Browse the repository at this point in the history
Write to audit logs table from forklift
  • Loading branch information
nickgerace authored Nov 20, 2024
2 parents 14949a3 + 6b961b7 commit f387ef6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 19 deletions.
81 changes: 65 additions & 16 deletions lib/dal/src/audit_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,37 @@ pub(crate) async fn publish_pending(
error!(?err, "audit logs shuttle error");
}

match WsEvent::audit_logs_published(&ctx_clone_for_ws_event).await {
Ok(event) => {
if let Err(err) = event.publish_immediately(&ctx_clone_for_ws_event).await {
error!(?err, "error when publishing ws event for audit logs");
match ChangeSet::find(
&ctx_clone_for_ws_event,
ctx_clone_for_ws_event.change_set_id(),
)
.await
{
Ok(Some(change_set)) => {
match WsEvent::audit_logs_published(
&ctx_clone_for_ws_event,
change_set.id,
change_set.status,
)
.await
{
Ok(event) => {
if let Err(err) =
event.publish_immediately(&ctx_clone_for_ws_event).await
{
error!(?err, "error when publishing ws event for audit logs");
}
}
Err(err) => error!(?err, "error when creating ws event for audit logs"),
}
}
Err(err) => error!(?err, "error when creating ws event for audit logs"),
Ok(None) => {
trace!("skipping ws event creation for audit logs: no change set found")
}
Err(err) => error!(
?err,
"error when attempting to find change set for ws event for audit logs"
),
}
});
} else {
Expand All @@ -151,13 +175,37 @@ pub(crate) async fn publish_pending(
tracker.close();
tracker.wait().await;

match WsEvent::audit_logs_published(&ctx_clone_for_ws_event).await {
Ok(event) => {
if let Err(err) = event.publish_immediately(&ctx_clone_for_ws_event).await {
error!(?err, "error when publishing ws event for audit logs");
match ChangeSet::find(
&ctx_clone_for_ws_event,
ctx_clone_for_ws_event.change_set_id(),
)
.await
{
Ok(Some(change_set)) => {
match WsEvent::audit_logs_published(
&ctx_clone_for_ws_event,
change_set.id,
change_set.status,
)
.await
{
Ok(event) => {
if let Err(err) =
event.publish_immediately(&ctx_clone_for_ws_event).await
{
error!(?err, "error when publishing ws event for audit logs");
}
}
Err(err) => error!(?err, "error when creating ws event for audit logs"),
}
}
Err(err) => error!(?err, "error when creating ws event for audit logs"),
Ok(None) => {
trace!("skipping ws event creation for audit logs: no change set found")
}
Err(err) => error!(
?err,
"error when attempting to find change set for ws event for audit logs"
),
}
});
}
Expand Down Expand Up @@ -462,15 +510,16 @@ pub struct AuditLogsPublishedPayload {
}

impl WsEvent {
pub async fn audit_logs_published(ctx: &DalContext) -> WsEventResult<Self> {
let change_set = ChangeSet::find(ctx, ctx.change_set_id())
.await?
.ok_or(ChangeSetError::ChangeSetNotFound(ctx.change_set_id()))?;
pub async fn audit_logs_published(
ctx: &DalContext,
change_set_id: crate::ChangeSetId,
change_set_status: ChangeSetStatus,
) -> WsEventResult<Self> {
WsEvent::new(
ctx,
WsPayload::AuditLogsPublished(AuditLogsPublishedPayload {
change_set_id: change_set.id,
change_set_status: change_set.status,
change_set_id,
change_set_status,
}),
)
.await
Expand Down
4 changes: 1 addition & 3 deletions lib/dal/src/ws_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ use crate::{
user::CursorPayload, ChangeSetId, DalContext, FuncError, PropId, StandardModelError,
TransactionsError, WorkspacePk,
};
use crate::{ChangeSetError, SchemaVariantError, SecretCreatedPayload, SecretUpdatedPayload};
use crate::{SchemaVariantError, SecretCreatedPayload, SecretUpdatedPayload};

#[remain::sorted]
#[derive(Error, Debug)]
pub enum WsEventError {
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("func error: {0}")]
Func(#[from] Box<FuncError>),
#[error("nats txn error: {0}")]
Expand Down

0 comments on commit f387ef6

Please sign in to comment.