Skip to content

Commit

Permalink
Merge pull request #29 from wjordan/catalog_node_watch_fix
Browse files Browse the repository at this point in the history
Fix fine-grained blocking queries for Catalog.NodeServices
  • Loading branch information
wjordan authored Feb 22, 2022
2 parents 417eb13 + 3cc8970 commit 78aeb8c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
13 changes: 10 additions & 3 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,9 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
if err := catalogUpdateNodesIndexes(tx, idx, entMeta); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
if err := catalogUpdateNodeIndexes(tx, nodeName, idx, entMeta); err != nil {
return fmt.Errorf("failed updating node index: %s", err)
}

// Invalidate any sessions for this node.
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
Expand Down Expand Up @@ -1494,9 +1497,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}

// Get the table index.
idx := catalogMaxIndex(tx, entMeta, false)

// Query the node by node name
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID, EnterpriseMeta: *entMeta})
if err != nil {
Expand Down Expand Up @@ -1551,6 +1551,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
}
ws.Add(services.WatchCh())

// Get the node index.
idx := catalogNodeMaxIndex(tx, nodeName, entMeta)

return false, idx, node, services, nil
}

Expand Down Expand Up @@ -1702,6 +1705,10 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("failed updating index: %s", err)
}

if err := catalogUpdateNodeIndexes(tx, nodeName, idx, entMeta); err != nil {
return err
}

svc := service.(*structs.ServiceNode)
name := svc.CompoundServiceName()

Expand Down
26 changes: 26 additions & 0 deletions agent/consul/state/catalog_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
return "service_kind." + kind.Normalized()
}

func nodeIndexName(name string, _ *structs.EnterpriseMeta) string {
return fmt.Sprintf("node.%s", name)
}

func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error {
// overall nodes index
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
Expand All @@ -31,6 +35,15 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.Enterpr
return nil
}

func catalogUpdateNodeIndexes(tx WriteTxn, nodeName string, idx uint64, _ *structs.EnterpriseMeta) error {
// per-node index
if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}

return nil
}

func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// overall services index
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
Expand Down Expand Up @@ -78,6 +91,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
return err
}

if err := catalogUpdateNodeIndexes(tx, node.Node, node.ModifyIndex, node.GetEnterpriseMeta()); err != nil {
return err
}

// Update the node's service indexes as the node information is included
// in health queries and we would otherwise miss node updates in some cases
// for those queries.
Expand Down Expand Up @@ -106,13 +123,22 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
return err
}

// Update the node's index as the service information is included in node catalog queries.
if err := catalogUpdateNodeIndexes(tx, svc.Node, svc.ModifyIndex, &svc.EnterpriseMeta); err != nil {
return err
}

return nil
}

func catalogNodesMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableNodes)
}

func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, nodeIndexName(nodeName, nil))
}

func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableServices)
}
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,8 @@ func TestStateStore_EnsureService(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 30 {
// Index matches the requested node's latest service update.
if idx != 20 {
t.Fatalf("bad index: %d", idx)
}

Expand Down

0 comments on commit 78aeb8c

Please sign in to comment.