Skip to content

Commit

Permalink
updates for nsqlookupd, nsqadmin objects to pass and view topology info
Browse files Browse the repository at this point in the history
  • Loading branch information
zoemccormick committed Jan 2, 2024
1 parent 911aba3 commit a8000ff
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 28 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/sys v0.10.0 // indirect
)

Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0=
Expand Down Expand Up @@ -29,13 +27,13 @@ github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 h1:SeSEfdIxyvwGJliREIJhRPPXvW6sDlLT+UQ3B0hD0NA=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
10 changes: 10 additions & 0 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
Hostname string `json:"hostname"`
HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}

type statsRespType struct {
Expand Down Expand Up @@ -409,6 +411,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
HTTPPort: infoResp.HTTPPort,
TCPPort: infoResp.TCPPort,
Topics: producerTopics,
TopologyZone: infoResp.TopologyZone,
TopologyRegion: infoResp.TopologyRegion,
})
}(addr)
}
Expand Down Expand Up @@ -437,6 +441,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
Hostname string `json:"hostname"`
HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}

type statsRespType struct {
Expand Down Expand Up @@ -508,6 +514,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
HTTPPort: infoResp.HTTPPort,
TCPPort: infoResp.TCPPort,
Topics: producerTopics,
TopologyZone: infoResp.TopologyZone,
TopologyRegion: infoResp.TopologyRegion,
})
lock.Unlock()

Expand Down Expand Up @@ -607,6 +615,8 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
}
for _, c := range channel.Clients {
c.Node = addr
c.NodeTopologyRegion = p.TopologyRegion
c.NodeTopologyZone = p.TopologyZone
}
channelStats.Add(channel)
}
Expand Down
77 changes: 58 additions & 19 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Producer struct {
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
VersionObj semver.Version `json:"-"`
Topics ProducerTopics `json:"topics"`
OutOfDate bool `json:"out_of_date"`
Expand All @@ -46,6 +48,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
Version string `json:"version"`
Topics []string `json:"topics"`
Tombstoned []bool `json:"tombstones"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}
if err := json.Unmarshal(b, &r); err != nil {
return err
Expand All @@ -57,6 +61,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
TCPPort: r.TCPPort,
HTTPPort: r.HTTPPort,
Version: r.Version,
TopologyZone: r.TopologyZone,
TopologyRegion: r.TopologyRegion,
}
for i, t := range r.Topics {
p.Topics = append(p.Topics, ProducerTopic{Topic: t, Tombstoned: r.Tombstoned[i]})
Expand Down Expand Up @@ -189,25 +195,29 @@ func (c *ChannelStats) Add(a *ChannelStats) {
}

type ClientStats struct {
Node string `json:"node"`
RemoteAddress string `json:"remote_address"`
Version string `json:"version"`
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
UserAgent string `json:"user_agent"`
ConnectTs int64 `json:"connect_ts"`
ConnectedDuration time.Duration `json:"connected"`
InFlightCount int `json:"in_flight_count"`
ReadyCount int `json:"ready_count"`
FinishCount int64 `json:"finish_count"`
RequeueCount int64 `json:"requeue_count"`
MessageCount int64 `json:"message_count"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
Authed bool `json:"authed"`
AuthIdentity string `json:"auth_identity"`
AuthIdentityURL string `json:"auth_identity_url"`
Node string `json:"node"`
RemoteAddress string `json:"remote_address"`
Version string `json:"version"`
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
UserAgent string `json:"user_agent"`
ConnectTs int64 `json:"connect_ts"`
ConnectedDuration time.Duration `json:"connected"`
InFlightCount int `json:"in_flight_count"`
ReadyCount int `json:"ready_count"`
FinishCount int64 `json:"finish_count"`
RequeueCount int64 `json:"requeue_count"`
MessageCount int64 `json:"message_count"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
Authed bool `json:"authed"`
AuthIdentity string `json:"auth_identity"`
AuthIdentityURL string `json:"auth_identity_url"`
NodeTopologyRegion string `json:"node_topology_region,omitempty"`
NodeTopologyZone string `json:"node_topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
TopologyZone string `json:"topology_zone,omitempty"`

TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
Expand Down Expand Up @@ -262,6 +272,35 @@ func (c ClientsByHost) Less(i, j int) bool {
return c.ClientStatsList[i].Hostname < c.ClientStatsList[j].Hostname
}

type ClientStatsByNodeTopology struct {
ClientStatsList
}

func (c ClientStatsByNodeTopology) Less(i, j int) bool {
// if its the same node, sort by topology
if c.ClientStatsList[i].Node == c.ClientStatsList[j].Node {
region := c.ClientStatsList[i].NodeTopologyRegion
zone := c.ClientStatsList[i].NodeTopologyZone

switch {
case c.ClientStatsList[i].TopologyRegion == region && c.ClientStatsList[i].TopologyZone == zone:
return true
case c.ClientStatsList[j].TopologyRegion == region && c.ClientStatsList[j].TopologyZone == zone:
return false
case c.ClientStatsList[i].TopologyRegion == region:
return true
case c.ClientStatsList[j].TopologyRegion == region:
return false
default:
if c.ClientStatsList[i].TopologyRegion == c.ClientStatsList[j].TopologyRegion {
return c.ClientStatsList[i].TopologyZone < c.ClientStatsList[j].TopologyZone
}
return c.ClientStatsList[i].TopologyRegion < c.ClientStatsList[j].TopologyRegion
}
}
return c.ClientStatsList[i].Node < c.ClientStatsList[j].Node
}

type TopicStatsList []*TopicStats

func (t TopicStatsList) Len() int { return len(t) }
Expand Down
3 changes: 3 additions & 0 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -328,6 +329,8 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
messages = append(messages, pe.Error())
}

sort.Sort(clusterinfo.ClientStatsByNodeTopology{channelStats[channelName].Clients})

return struct {
*clusterinfo.ChannelStats
Message string `json:"message"`
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/static/build/main.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nsqadmin/static/build/main.js.map

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion nsqadmin/static/js/views/channel.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
<th>Connected</th>
</tr>
{{#each clients}}
<tr>
<tr style={{#if (eq topology_zone node_topology_zone)}}"background-color:rgb(221,255,221)"{{/if}}{{#if (eq topology_region node_topology_region)}}"background-color:rgb(254,254,194)"{{/if}}>
<td title="{{remote_address}}">{{hostname_port}}{{#if show_client_id}} ({{client_id}}){{/if}}</td>
<td>{{#if user_agent.length}}<small>{{user_agent}}</small>{{/if}}</td>
<td>
Expand All @@ -209,6 +209,19 @@
{{#if auth_identity_url}}</a>{{/if}}
</span>
{{/if}}
{{#if topology_region}}
<span class="label label-default">{{topology_region}}</span>
{{/if}}
{{#if topology_zone}}
<span class="label label-default">{{topology_zone}}</span>
{{/if}}
{{#if (eq topology_zone node_topology_zone)}}
<span class="label label-default">zoneLocal</span>
{{else}}
{{#if (eq topology_region node_topology_region)}}
<span class="label label-default">regionLocal</span>
{{/if}}
{{/if}}
</td>
<td><a class="link" href="{{basePath "/nodes"}}/{{node}}">{{node}}</a></td>
<td>{{commafy in_flight_count}}</td>
Expand Down
4 changes: 4 additions & 0 deletions nsqadmin/static/js/views/nodes.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<th>TCP Port</th>
<th>HTTP Port</th>
<th>Version</th>
<th>Region</th>
<th>Zone</th>
{{#if nsqlookupd.length}}
<th>Lookupd Conns.</th>
{{/if}}
Expand All @@ -28,6 +30,8 @@
<td>{{tcp_port}}</td>
<td>{{http_port}}</td>
<td>{{version}}</td>
<td>{{topology_region}}</td>
<td>{{topology_zone}}</td>
{{#if ../nsqlookupd.length}}
<td>
<a class="conn-count btn btn-default btn-xs {{#unlesseq ../../nsqlookupd.length remote_addresses.length}}btn-warning{{/unlesseq}}">{{remote_addresses.length}}</a>
Expand Down
2 changes: 2 additions & 0 deletions nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
ci["http_port"] = n.getOpts().BroadcastHTTPPort
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress
ci["topology_zone"] = n.getOpts().TopologyZone
ci["topology_region"] = n.getOpts().TopologyRegion

cmd, err := nsq.Identify(ci)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ type node struct {
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
ToplogyZone string `json:"topology_zone"`
ToplogyRegion string `json:"topology_region"`
Tombstones []bool `json:"tombstones"`
Topics []string `json:"topics"`
}
Expand Down Expand Up @@ -293,6 +295,8 @@ func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro
TCPPort: p.peerInfo.TCPPort,
HTTPPort: p.peerInfo.HTTPPort,
Version: p.peerInfo.Version,
ToplogyZone: p.peerInfo.TopologyZone,
ToplogyRegion: p.peerInfo.TopologyRegion,
Tombstones: tombstones,
Topics: topics,
}
Expand All @@ -318,6 +322,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
"tcp_port": p.peerInfo.TCPPort,
"http_port": p.peerInfo.HTTPPort,
"version": p.peerInfo.Version,
"topology_zone": p.peerInfo.TopologyZone,
"topology_region": p.peerInfo.TopologyRegion,
"last_update": atomic.LoadInt64(&p.peerInfo.lastUpdate),
"tombstoned": p.tombstoned,
"tombstoned_at": p.tombstonedAt.UnixNano(),
Expand Down
2 changes: 2 additions & 0 deletions nsqlookupd/registration_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type PeerInfo struct {
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
TopologyZone string `json:"topology_zone"`
TopologyRegion string `json:"topology_region"`
}

type Producer struct {
Expand Down

0 comments on commit a8000ff

Please sign in to comment.