-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(server): Updating data store test to use queues #3166
Conversation
@@ -170,6 +176,32 @@ message TraceIdResponse { | |||
string id = 1; | |||
} | |||
|
|||
message DataStoreConnectionTestRequest { | |||
string requestID = 1; | |||
DataStore datastore = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataStore datastore = 5; | |
DataStore datastore = 2; |
??
server/http/controller.go
Outdated
@@ -78,6 +83,13 @@ type transactionRunner interface { | |||
Run(context.Context, testsuite.TestSuite, test.RunMetadata, variableset.VariableSet, *[]testrunner.RequiredGate) testsuite.TestSuiteRun | |||
} | |||
|
|||
type dataStoreTestRunner interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name suggestion
type dataStoreTestRunner interface { | |
type dataStoreTester interface { |
server/testconnection/pipeline.go
Outdated
} | ||
|
||
func (p *DataStoreTestPipeline) Run(ctx context.Context, job Job) { | ||
spew.Dump(job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spew.Dump(job) |
server/testconnection/pipeline.go
Outdated
job := Job{ | ||
Headers: make(map[string]string), | ||
} | ||
job.ID = uuid.New().String() | ||
job.DataStore = datastore | ||
job.Headers[string(middleware.TenantIDKey)] = middleware.TenantIDFromContext(ctx) | ||
|
||
return job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job := Job{ | |
Headers: make(map[string]string), | |
} | |
job.ID = uuid.New().String() | |
job.DataStore = datastore | |
job.Headers[string(middleware.TenantIDKey)] = middleware.TenantIDFromContext(ctx) | |
return job | |
return Job{ | |
ID: uuid.New().String() | |
DataStore: datastore | |
Headers:= map[string]string{ | |
string(middleware.TenantIDKey): middleware.TenantIDFromContext(ctx) | |
} | |
} |
server/testconnection/pipeline.go
Outdated
) | ||
|
||
type Job struct { | ||
Headers map[string]string `json:"headers"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are the headers used for anything other than passing the tenant ID? if so, it might be simpler to just have a TenantID string
field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep using the same approach we have for the rest, but yeah we can just change it for a new field, let me check..
ctx, pollingSpan := w.tracer.Start(ctx, "dsTestConnectionRequest.ProcessItem") | ||
defer pollingSpan.End() | ||
|
||
traceDB, err := getTraceDB(job.DataStore, w.newTraceDBFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i understand this correctly, this function's only effect is to wrap the error with a message. I'd recommend to just have this behavior inlined here so it's easier to follow
server/testconnection/listener.go
Outdated
func (m *Listener) Notify(job Job) { | ||
for _, sub := range m.subscriptions[job.ID] { | ||
sub(job) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not using locks for reading there can still be race conditions
func (m *Listener) Notify(job Job) { | |
for _, sub := range m.subscriptions[job.ID] { | |
sub(job) | |
} | |
} | |
func (m *Listener) Notify(job Job) { | |
m.mutex.Lock() | |
defer m.mutex.Unlock() | |
for _, sub := range m.subscriptions[job.ID] { | |
sub(job) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments are mostly optiona, except for the one about the race conditoin. that needs to be fixed or it might cause panics on prod
This PR updates the data test connection logic to use queues
Changes
Checklist