Skip to content

Commit

Permalink
[FIXED] Race condition when adding endpoints from goroutine (#1484)
Browse files Browse the repository at this point in the history
  • Loading branch information
actatum authored Jan 12, 2024
1 parent c8f56b8 commit 51412b7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,15 @@ func addEndpoint(s *service, name, subject string, handler Handler, metadata map
if err != nil {
return err
}
s.m.Lock()
endpoint.subscription = sub
s.endpoints = append(s.endpoints, endpoint)
endpoint.stats = EndpointStats{
Name: name,
Subject: subject,
QueueGroup: queueGroup,
}
s.m.Unlock()
return nil
}

Expand Down Expand Up @@ -697,6 +699,9 @@ func (s *service) serviceIdentity() ServiceIdentity {

// Info returns information about the service
func (s *service) Info() Info {
s.m.Lock()
defer s.m.Unlock()

endpoints := make([]EndpointInfo, 0, len(s.endpoints))
for _, e := range s.endpoints {
endpoints = append(endpoints, EndpointInfo{
Expand Down
42 changes: 41 additions & 1 deletion micro/test/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func TestServiceBasics(t *testing.T) {
if svcs[0].Stats().Endpoints[0].NumRequests != 0 {
t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats())
}

}

func TestAddService(t *testing.T) {
Expand Down Expand Up @@ -1016,7 +1015,48 @@ func TestContextHandler(t *testing.T) {
if resp.Header.Get(micro.ErrorCodeHeader) != "400" {
t.Fatalf("Expected error response after canceling context; got: %q", string(resp.Data))
}
}

func TestAddEndpoint_RaceCondition(t *testing.T) {
// This test will fail with the '-race' flag if the lock/unlock are removed from service.go lines 437 and 445
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()

ctx := context.Background()

handler := func(ctx context.Context, req micro.Request) {
req.RespondJSON(map[string]any{"hello": "world"})
}
config := micro.Config{
Name: "test_service",
Version: "0.1.0",
Endpoint: &micro.EndpointConfig{
Subject: "test.func",
Handler: micro.ContextHandler(ctx, handler),
},
}

srv, err := micro.AddService(nc, config)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer srv.Stop()

errs := make(chan error)
go func(errs chan error) {
errs <- srv.AddEndpoint("test", micro.ContextHandler(ctx, handler))
}(errs)

err = <-errs
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}

func TestServiceStats(t *testing.T) {
Expand Down

0 comments on commit 51412b7

Please sign in to comment.