-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(SIP-39): Websocket sidecar app #11498
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11498 +/- ##
==========================================
+ Coverage 78.35% 78.47% +0.12%
==========================================
Files 934 937 +3
Lines 47350 47439 +89
Branches 5941 5976 +35
==========================================
+ Hits 37102 37229 +127
+ Misses 10104 10067 -37
+ Partials 144 143 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
superset-websocket/client-ws-app/public/javascripts/js.cookie.min.js
Outdated
Show resolved
Hide resolved
Did you consider Typescript here? |
@craig-rueda yes, the plan is to migrate this to TypeScript (added as a TODO item) |
If you are going to use TypeScript, I'd recommend look into integrating esbuild in the workflow. It's much faster than Babel and TSC and seems good enough for node.js environment. |
14b4b2c
to
cf6e3a7
Compare
9ab49d4
to
7f0dafc
Compare
7f0dafc
to
dc968bc
Compare
9518ebf
to
f5087ed
Compare
superset-websocket/src/index.ts
Outdated
} | ||
|
||
const startServer = process.argv[2] === 'start'; | ||
const configFile = environment === 'test' ? '../config.test.json' : '../config.json'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a CLI argument for the config file?
* Module dependencies. | ||
*/ | ||
|
||
var app = require('../app'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do? Is it the main entrypoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all boilerplate Express code. Not used in production app.
app.set('views', path.join(__dirname, 'views')); | ||
app.set('view engine', 'jade'); | ||
|
||
app.use(logger('dev')); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's dev
in this context? Is that the logger level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boilerplate Express app code. Not part of the production app.
app.use(function(err, req, res) { | ||
// set locals, only providing error in development | ||
res.locals.message = err.message; | ||
res.locals.error = req.app.get('env') === 'development' ? err : {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we leverage settings from the config
file referenced above? I'd advise using one or the other (env vars of config file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test utility app only, all boilerplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks good. I'm pretty sure it works, as you've been working on this for some time now... I just think a few spots could use some more code comments to guide others when digging in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the core logic is looking good, but had a few minor comments.
Also:
- Nitpick but looks like the indentation is inconsistent across files with both two and four spaces being used. Not a huge deal but might be nice to add prettier and forget about these types of things.
- Do we normally check in auxiliary code into the core repository? In this case, I'm referring the the load testing application. Would it make sense to have that as a separate repo outside of the main Superset repo?
@@ -0,0 +1,18 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ignore node_modules
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
node_modules
is ignored in the top-level .gitignore
, but I'll make it explicit.
GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://<host>:<port>/" | ||
``` | ||
|
||
Note that the WebSocket server must be run on the same hostname (different port) for cookies to be shared between the Flask app and the WebSocket server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the JWT the only thing needed from the cookies? If so, have we considered passing that value explicitly? Seems like if we could do that, we could remove this assumption/constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pass it explicitly, though we'd have to make the cookie readable by JS (currently it's httponly), so there is a bit of a tradeoff here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping the cookie httponly is a good move!
"uuid": "^8.3.2", | ||
"ws": "^7.4.2" | ||
}, | ||
"devDependencies": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not important now, but I think it would be good to eventually add prettier
to this application.
superset-websocket/src/index.ts
Outdated
try { | ||
socketInstance.ws.send(strData); | ||
} catch(err) { | ||
console.debug('Error sending to socket', err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is this error indicative of a real problem? If so, should this be console.error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen for a number of reasons, including a disconnected socket. I believe this was a console.error
and was downgraded to a debug due to noise.
superset-websocket/src/index.ts
Outdated
|
||
export const sendToChannel = (channel: string, value: EventValue): void => { | ||
const strData = JSON.stringify(value); | ||
if(!channels[channel]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior when we pull an event off the redis stream but for one reason or another we have no channel to send it to? Is that event lost? Or does the event stay in the redis stream and we expect the client to have the right state to connect to the event server and pull it in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Events are persisted in the Redis streams until the (configurable) maximum number of items is reached. For the global stream, yes, the event is dropped if there is no available channel/socket connected. The reconnection logic then handles replaying any events that were lost while disconnected.
|
||
export const subscribeToGlobalStream = async (stream: string, listener: ListenerFunction) => { | ||
/*eslint no-constant-condition: ["error", { "checkLoops": false }]*/ | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably premature to worry about performance here and I think this is likely not an issue, but I'm curious if there are any potential concerns you foresee with an infinite loop calling redis? It looks like the BLOCK option should prevent too many requests in too short a time since if no events are available it'll wait, but not sure what would happen if one or two events regularly trickle in within short timeframes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When events are received, it returns immediately. More info here: https://redis.io/commands/xread#blocking-for-data
When the BLOCK command is passed, but there is data to return at least in one of the streams passed, the command is executed synchronously exactly like if the BLOCK option would be missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Block" could be a confusing term here. In typical Node parlance, "block" means synchronous code, which this is not. It might be helpful to add a comment clarifying what "blocking" means in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}; | ||
|
||
type ListenerFunction = (results: StreamResult[]) => void; | ||
interface EventValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading this correctly, it looks like under the polling implementation, the response is in the format:
type PollingResponse = {
result: EventValue;
}
Whereas the response here is simply:
type WebSocketResponse = EventValue;
I'm not opinionated either way, but I think we should consider having a standard structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it looks like the initial call to /api/v1/chart/data
returns EventValue
without the { result: ... }
wrapper, so maybe we should change the polling response as that seems to be the outlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, nevermind, I missed that the polling response returns a list given there could be multiple events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few minor-medium suggestions. Looks great, excited for this feature (and to get a node server in Superset! 😁)
GLOBAL_ASYNC_QUERIES_REDIS_CONFIG | ||
GLOBAL_ASYNC_QUERIES_REDIS_STREAM_PREFIX | ||
GLOBAL_ASYNC_QUERIES_JWT_COOKIE_NAME | ||
GLOBAL_ASYNC_QUERIES_JWT_SECRET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idea, non-blocking: If these are not configured correctly, can we have the service detect it and provide hints to help devs/admins get it working?
superset-websocket/config.test.json
Outdated
"db": 10, | ||
"ssl": false | ||
}, | ||
"streamPrefix": "test-async-events-", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change request: Can we make this more specific? Not clear what a streamPrefix
is used for.
"lint": "eslint . --ext .js,.jsx,.ts,.tsx && npm run type", | ||
"dev-server": "ts-node src/index.ts start", | ||
"build": "tsc", | ||
"prettier-check": "prettier --check .", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change request: Could this be named format-check
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was named prettier-check
for consistency with superset-frontend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to rename, but curious what people think about keeping these kinds of things consistent.
@@ -1124,6 +1125,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods | |||
GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me" | |||
GLOBAL_ASYNC_QUERIES_TRANSPORT = "polling" | |||
GLOBAL_ASYNC_QUERIES_POLLING_DELAY = 500 | |||
GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://127.0.0.1:8080/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there potential for the websocket server to be used for purposes other than async queries in the future? If so, this config should be given a more general name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, yes, but no immediate use cases have been defined.
reset_token = True | ||
elif user_id != session["async_user_id"]: | ||
reset_token = True | ||
reset_token = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small change but I like how much easier to read this is 🎉
}; | ||
|
||
type ListenerFunction = (results: StreamResult[]) => void; | ||
interface EventValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this server might eventually deliver multiple kinds of events, it would be desirable to separate the id
and channel_id
, which are used by the server, from the schema for the other fields, which are specific to the async query event type.
So, an interface like:
interface EventValue<T extends object> {
id: string;
channel_id: string;
data: T; // everything else
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear what future use cases might be, but the existing properties are fairly generic, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future use cases could include a wide variety of things, some that I can think of:
- real-time updates to entities such as dashboards
- comments or other user activity
- notifications
- streaming query results
- any other real-time state change
The schema of different types of messages seems quite likely to vary from that of async queries, imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that there are many potential use cases, but my point is that these should not hold up this PR. The interface in question is already in use, introduced in 1.0, and would require some additional planning and semver gymnastics to refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that this and the config comment seem somewhat out of scope for this PR, just wanted to raise it as it seems like the kind of thing that might be significantly more difficult to change later.
|
||
export const subscribeToGlobalStream = async (stream: string, listener: ListenerFunction) => { | ||
/*eslint no-constant-condition: ["error", { "checkLoops": false }]*/ | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Block" could be a confusing term here. In typical Node parlance, "block" means synchronous code, which this is not. It might be helpful to add a comment clarifying what "blocking" means in this context.
* Reads a range of events from a channel-specific Redis event stream. | ||
* Invoked in the client re-connection flow. | ||
*/ | ||
export const fetchRangeFromStream = async ({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return the reply in a promise instead of using a listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good call. I'll look at refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this more, there's an argument to be made for keeping this consistent with the interface for subscribeToGlobalStream
.
Co-authored-by: David Aaron Suddjian <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to add to the already extensive comments from others, so I focused on testing this and I must say this is really exciting stuff; chart loading times seem to be way down and it almost feels like one of the main bottlenecks right now for pre-cached data is the slow rendering times of the NVD3 charts 🎉 Tested the following:
- install based on documentation - very straight forward, had it up and running in a few mins ✅
- regular dashboard loading, comparing non-DAQ, polling DAQ and WS DAQ - WS DAQ loaded - WS DAQ by far the quickest to load ✅
- tested rendering of native filters (both filter tab and filter config modal) ✅
- triggering chart queries with native filters and cross filters ✅
const express = require('express'); | ||
const router = express.Router(); | ||
const jwt = require('jsonwebtoken'); | ||
const config = require('../../../config.json'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
const config = require('../../../config.json'); | |
const config = require('/config.json'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this will actually work, node might try to get config.json
from the root dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was unsure, too, but it did seem to work when I tested it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem to work for me:
Error: Cannot find module '/config.json'
* WIP node.js websocket app * Load testing * Multi-stream publish with blocking reads * Use JWT for auth and channel ID * Update ws jwt cookie name * Typescript * Frontend WebSocket transport support * ws server ping/pong and GC logic * ws server unit tests * GC interval config, debug logging * Cleanup JWT cookie logic * Refactor asyncEvents.ts to support non-Redux use cases * Update tests for refactored asyncEvents * Add eslint, write READMEs, reorg files * CI workflow * Moar Apache license headers * pylint found something * adjust GH actions workflow * Improve documentation & comments * Prettier * Add configurable logging via Winston * Add SSL support for Redis connections * Fix incompatible logger statements * Apply suggestions from code review Co-authored-by: David Aaron Suddjian <[email protected]> * rename streamPrefix config Co-authored-by: David Aaron Suddjian <[email protected]>
* WIP node.js websocket app * Load testing * Multi-stream publish with blocking reads * Use JWT for auth and channel ID * Update ws jwt cookie name * Typescript * Frontend WebSocket transport support * ws server ping/pong and GC logic * ws server unit tests * GC interval config, debug logging * Cleanup JWT cookie logic * Refactor asyncEvents.ts to support non-Redux use cases * Update tests for refactored asyncEvents * Add eslint, write READMEs, reorg files * CI workflow * Moar Apache license headers * pylint found something * adjust GH actions workflow * Improve documentation & comments * Prettier * Add configurable logging via Winston * Add SSL support for Redis connections * Fix incompatible logger statements * Apply suggestions from code review Co-authored-by: David Aaron Suddjian <[email protected]> * rename streamPrefix config Co-authored-by: David Aaron Suddjian <[email protected]>
SUMMARY
Node.js WebSocket server application for SIP-39 pub/sub architecture. This PR contains:
superset-websocket/src/index.ts
)ping
/pong
for connection managementGLOBAL_ASYNC_QUERIES_WEBSOCKET_URL
superset-websocket/utils/client-ws-app/
)Corresponding Superset web app changes are in the following PRs: #11499, #13696
To be added in future PRs:
TEST PLAN
ADDITIONAL INFORMATION