Skip to content

Commit

Permalink
Test with ATTR and SRC combo
Browse files Browse the repository at this point in the history
  • Loading branch information
sahusanket committed Dec 9, 2024
1 parent 16c4f3e commit a620d58
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public MessagingAuditLogWriter(MessagingService messagingService,
@Override
public void publish(@Nullable AuditLogRequest auditLogRequest) throws IOException {

if (auditLogRequest.getUri().contains("/v3/namespaces/default/artifacts/")){
LOG.warn("SANKET_TEST2 : publish for uri : {}", auditLogRequest.getUri());
}
LOG.warn("SANKET_TEST2 : publish for uri : {}", auditLogRequest == null ? null : auditLogRequest.toString());

if (auditLogRequest != null &&
(auditLogRequest.getAuditLogContextQueue().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class AuthenticationChannelHandler extends ChannelDuplexHandler {
private static final String EMPTY_USER_IP = "CDAP-empty-user-ip";
private static final String AUDIT_LOG_QUEUE_ATTR_NAME = "AUDIT_LOG_QUEUE_ATTR";
private static final String AUDIT_LOG_USERIP_ATTR_NAME = "AUDIT_LOG_USERIP_ATTR";
private static final String AUDIT_LOG_REQUEST_ATTR_NAME = "AUDIT_LOG_REQ_ATTR";

private final boolean internalAuthEnabled;
private final boolean auditLoggingEnabled;
Expand All @@ -73,8 +74,15 @@ public AuthenticationChannelHandler(boolean internalAuthEnabled, boolean auditLo
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.warn("SANKET_TEST3 : ChannelRead Start : chanelId : {} , pipeline.channelId : {} ", ctx.channel().id(),
ctx.pipeline().channel().id());
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
LOG.warn("SANKET_TEST ACH 1 : ChannelRead : chanelId : {} , pipeline.channelId : {} and URI : {} and Method {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
request.uri(),
request.method());

}
SecurityRequestContext.reset();

// Only set SecurityRequestContext for the HttpRequest but not for subsequence chunks.
Expand Down Expand Up @@ -135,18 +143,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
SecurityRequestContext.setUserIp(currentUserIp);
}

if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
LOG.warn("SANKET_TEST3 : ChannelRead : chanelId : {} , pipeline.channelId : {} and URI : {} and Method {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
request.uri(),
request.method());

}
try {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
LOG.warn("SANKET_TEST ACH 2 : ChannelRead : chanelId : {} , pipeline.channelId : {} and URI : {} and Method {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
request.uri(),
request.method());

}
ctx.fireChannelRead(msg);
} finally {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
LOG.warn("SANKET_TEST ACH 3 FINALLY : ChannelRead : chanelId : {} , pipeline.channelId : {} and URI : {} and Method {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
request.uri(),
request.method());

}
setAuditLogMetaDataInChanel(ctx);
}
}
Expand All @@ -164,6 +182,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
publishAuditLogRequest(ctx);
super.write(ctx, msg, promise);
} finally {
LOG.warn("SANKET_TEST3 : WRITE : chanelId : {} , pipeline.channelId : {} ",
ctx.channel().id(),
ctx.pipeline().channel().id());
SecurityRequestContext.reset();
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).set(null);
}
Expand All @@ -181,6 +202,9 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
publishAuditLogRequest(ctx);
super.close(ctx, promise);
} finally {
LOG.warn("SANKET_TEST3 : CLOSE : chanelId : {} , pipeline.channelId : {} ",
ctx.channel().id(),
ctx.pipeline().channel().id());
SecurityRequestContext.reset();
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).set(null);
}
Expand All @@ -204,6 +228,14 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
*/
private void publishAuditLogRequest(ChannelHandlerContext ctx) throws IOException {
AuditLogRequest auditLogRequest = getModifiedAuditLogRequest(ctx);
LOG.warn("SANKET_TEST3 : publishAuditLogRequest : chanelId : {} , pipeline.channelId : {} " +
"and modified final AUDIT LOG REQUEST : {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
auditLogRequest == null? null : auditLogRequest.toString());

LOG.warn("SANKET_TEST4 : auditLogWriter.getClass() : " + auditLogWriter.getClass());

if (auditLoggingEnabled && auditLogRequest != null ) {
auditLogWriter.publish(auditLogRequest);
}
Expand All @@ -213,10 +245,18 @@ private AuditLogRequest getModifiedAuditLogRequest(ChannelHandlerContext ctx) {
AuditLogRequest auditLogRequest = SecurityRequestContext.getAuditLogRequest();

if (auditLogRequest == null){
LOG.warn("SANKET_TEST2 : getModifiedAuditLogRequest is NULL");
return null;
// LOG.warn("SANKET_TEST2 : getModifiedAuditLogRequest , Request from SRC is NULL");
Object auditLogRequestObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQUEST_ATTR_NAME)).get();
if (auditLogRequestObj != null) {
auditLogRequest = (AuditLogRequest) auditLogRequestObj;
} else {
// LOG.warn("SANKET_TEST2 : getModifiedAuditLogRequest is NULL");
return null;
}
}

// LOG.warn("SANKET_TEST2 : getModifiedAuditLogRequest , Request from SRC is NULL");

//If SecurityRequestContext has a Non Empty Queue, then auditLogRequest should have it from PostCall in
// AuditLogSetterHook.
if (!auditLogRequest.getAuditLogContextQueue().isEmpty()){
Expand All @@ -228,11 +268,11 @@ private AuditLogRequest getModifiedAuditLogRequest(ChannelHandlerContext ctx) {
Object auditLogContextsQueueAttr = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).get();

if (auditLogContextsQueueAttr == null) {
LOG.warn("SANKET_TEST2 : GETTING ATTR with NULL ");
// LOG.warn("SANKET_TEST2 : GETTING ATTR with NULL ");
return auditLogRequest;
}

LOG.warn("SANKET_TEST2 : GETTING ATTR with size : {}", ((Queue<AuditLogContext>) auditLogContextsQueueAttr).size());
// LOG.warn("SANKET_TEST2 : GETTING ATTR with size : {}", ((Queue<AuditLogContext>) auditLogContextsQueueAttr).size());

//Get other Meta Data that might have been reset in SRC.
Object userIpObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USERIP_ATTR_NAME)).get();
Expand All @@ -243,11 +283,9 @@ private AuditLogRequest getModifiedAuditLogRequest(ChannelHandlerContext ctx) {
}

private void setAuditLogMetaDataInChanel(ChannelHandlerContext ctx){

Object auditLogContextsQueueAttr = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).get();
Queue<AuditLogContext> auditLogContextQueue = SecurityRequestContext.getAuditLogQueue();

LOG.warn("SANKET_TEST3 : setAuditLogMetaDataInChanel : chanelId : {} , pipeline.channelId : {} " +
LOG.warn("SANKET_TEST ACH 4 (from Finally) : setAuditLogMetaDataInChanel : chanelId : {} , pipeline.channelId : {} " +
"and Q size from SRC : {}",
ctx.channel().id(),
ctx.pipeline().channel().id(),
Expand All @@ -257,14 +295,51 @@ private void setAuditLogMetaDataInChanel(ChannelHandlerContext ctx){
return;
}

Object auditLogContextsQueueAttr = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).get();

if (auditLogContextsQueueAttr == null) {
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).set(auditLogContextQueue);
} else {
Queue<AuditLogContext> auditLogContextQueueAttr = (Queue<AuditLogContext>) auditLogContextsQueueAttr;
//TODO check for dupes...
auditLogContextQueueAttr.addAll(auditLogContextQueue);
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).set(auditLogContextQueueAttr);
}
// Set Properties from SRC which might be used in AuditLogRequest.

// Set Properties from SRC which might be used in future.
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USERIP_ATTR_NAME)).set(SecurityRequestContext.getUserIp());

//If this is called after post call hook.
AuditLogRequest auditLogRequest = SecurityRequestContext.getAuditLogRequest();
if (auditLogRequest != null) {
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQUEST_ATTR_NAME)).set(auditLogRequest);
}
printAttributes(ctx);
}

//TESTING FUNCTIONS
private void printAttributes(ChannelHandlerContext ctx) {
Object logReqObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQUEST_ATTR_NAME)).get();
AuditLogRequest auditLogRequest =
logReqObj == null? null : (AuditLogRequest) logReqObj ;

Object queObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_QUEUE_ATTR_NAME)).get();

Queue<AuditLogContext> auditLogContextQueue =
queObj == null? null : (Queue<AuditLogContext>) queObj ;

int sizeFromAttr = auditLogContextQueue == null? 0 : auditLogContextQueue.size();

Object userIpObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USERIP_ATTR_NAME)).get();

String userIp = userIpObj == null? null : (String) userIpObj;

int auditLogContextQueueFromReq = auditLogRequest == null ? 0 : auditLogRequest.getAuditLogContextQueue().size();

LOG.warn(" SANKET_TEST ATTRIBUTES : channel id = {}, AuditLogRequst = {}, AuditLogReq's Q Size = {}, " +
"Queue size from Attr = {}, userIp = {}",
ctx.channel().id(), auditLogRequest, auditLogContextQueueFromReq, sizeFromAttr, userIp);

}

}

0 comments on commit a620d58

Please sign in to comment.