Skip to content

Commit

Permalink
Fixes for a crash and also for proxy parameters (#25)
Browse files Browse the repository at this point in the history
* introducing command line parameters

* adding statsd calls

* go.mod

* fixing a crash; organizing excessive func params

---------

Co-authored-by: Jeff Saremi <[email protected]>
  • Loading branch information
jeffsaremi and jeffsaremi authored Apr 11, 2023
1 parent d5727cf commit 4867b70
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
41 changes: 23 additions & 18 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,42 @@ type Proxy struct {
statsdCounters
}

func NewProxy(log *zap.Logger, sd *statsd.Client, config *config.Config, label, upstreamHost string, database int, minPoolSize, maxPoolSize int, readTimeout, writeTimeout time.Duration, readonly bool, maxSub, maxBlk int, idleTimeout time.Duration) (*Proxy, error) {
if label != "" {
log = log.With(zap.String("cluster", label))
func NewProxy(log *zap.Logger, sd *statsd.Client, config *config.Config, upstreamIndex int) (*Proxy, error) {
up := config.Upstreams[upstreamIndex]
if up.Label != "" {
log = log.With(zap.String("cluster", up.Label))

var err error
sd, err = util.StatsdWithTags(sd, []string{fmt.Sprintf("cluster:%s", label)})
sd, err = util.StatsdWithTags(sd, []string{fmt.Sprintf("cluster:%s", up.Label)})
if err != nil {
return nil, err
}
}
return &Proxy{
p := Proxy{
log: log,
statsd: sd,
config: config,

upstreamConfigHost: upstreamHost,
localConfigHost: localSocketPathFromUpstream(upstreamHost, database, readonly, config.LocalSocketPrefix, config.LocalSocketSuffix),
minPoolSize: minPoolSize,
maxPoolSize: maxPoolSize,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
database: database,
readonly: readonly,
idleTimeout: idleTimeout,
upstreamConfigHost: up.UpstreamConfigHost,
localConfigHost: localSocketPathFromUpstream(up.UpstreamConfigHost, up.Database, up.Readonly, config.LocalSocketPrefix, config.LocalSocketSuffix),
minPoolSize: up.MinPoolSize,
maxPoolSize: up.MaxPoolSize,
readTimeout: up.ReadTimeout,
writeTimeout: up.WriteTimeout,
database: up.Database,
readonly: up.Readonly,
idleTimeout: up.IdleTimeout,

quit: make(chan interface{}),
kill: make(chan interface{}),

listeners: make(map[string]*listener.Listener),
reservations: handlers.NewReservations(maxSub, maxBlk, sd),
}, nil
reservations: handlers.NewReservations(up.MaxSubscriptions, up.MaxBlockers, sd),
}
i, d := util.StatsdBackgroundGauge(sd, "proxy.listeners", []string{})
p.statsdCounters.listenerInc = StatsdBackgroundGaugeCallback(i)
p.statsdCounters.listenerDec = StatsdBackgroundGaugeCallback(d)
return &p, nil
}

func (p *Proxy) Run() error {
Expand Down Expand Up @@ -330,7 +335,7 @@ func (p *Proxy) createListener(local, upstream string) (*listener.Listener, erro
if err != nil {
return nil, err
}
p.statsdCounters.listenerInc("listeners.count", []string{fmt.Sprintf("upstream:%s", upstream)})
p.statsdCounters.listenerInc("", []string{fmt.Sprintf("upstream:%s", upstream)})
return listener, nil
}

Expand Down Expand Up @@ -410,7 +415,7 @@ func (p *Proxy) removeListener(key string) {
if ok {
ls.Shutdown()
delete(p.listeners, key)
p.statsdCounters.listenerDec("listeners.count", []string{fmt.Sprintf("upstream:%s", key)})
p.statsdCounters.listenerDec("", []string{fmt.Sprintf("upstream:%s", key)})
}
p.listenerLock.Unlock()
}
Expand Down
15 changes: 14 additions & 1 deletion proxy/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,21 @@ func SetupProxyAdvancedConfig(t *testing.T, upstreamPort string, db int, maxPool
LocalSocketSuffix: ".sock",
Unlink: true,
}
up := config.Upstream{
UpstreamConfigHost: uri,
Label: "test",
Database: db,
MinPoolSize: 1,
MaxPoolSize: maxPoolSize,
Readonly: readonly,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
MaxSubscriptions: 1,
MaxBlockers: 1,
}
cfg.Upstreams = []config.Upstream{up}

proxy, err := NewProxy(zap.L(), sd, cfg, "test", uri, db, 1, maxPoolSize, 1*time.Second, 1*time.Second, readonly, 1, 1, 0*time.Second)
proxy, err := NewProxy(zap.L(), sd, cfg, 0)
assert.NoError(t, err)
go func() {
err := proxy.Run()
Expand Down
8 changes: 2 additions & 6 deletions redisbetween.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ func proxies(c *config.Config, log *zap.Logger) (proxies []*proxy.Proxy, err err
if err != nil {
return nil, err
}
for _, u := range c.Upstreams {
p, err := proxy.NewProxy(
log, s, c, u.Label, u.UpstreamConfigHost, u.Database,
u.MinPoolSize, u.MaxPoolSize, u.ReadTimeout, u.WriteTimeout,
u.Readonly, u.MaxSubscriptions, u.MaxBlockers, u.IdleTimeout,
)
for index, _ := range c.Upstreams {
p, err := proxy.NewProxy(log, s, c, index)

if err != nil {
return nil, err
Expand Down

0 comments on commit 4867b70

Please sign in to comment.