Skip to content

Commit

Permalink
utilities: for consuemr flags use nsq.ConfigFlag
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Aug 25, 2015
1 parent af26505 commit 7db7687
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/BurntSushi/toml 2dff11163ee667d51dcc066660925a92ce138deb
github.com/bitly/go-hostpool 58b95b10d6ca26723a7f46017b348653b825a8d6
github.com/bitly/go-nsq 4271cd1529a78175e327570894988cc2cb21228f # v1.0.5-alpha
github.com/bitly/go-nsq 0f97a46d801c18d6fd1a12a9040d11cc4e4ef397 # v1.0.5-alpha
github.com/bitly/go-simplejson 18db6e68d8fd9cbf2e8ebe4c81a78b96fd9bf05a
github.com/bmizerany/perks/quantile 6cb9d9d729303ee2628580d9aec5db968da3a607
github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da
Expand Down
15 changes: 5 additions & 10 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@ var (
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)")

consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}
Expand All @@ -59,6 +54,11 @@ func (th *TailHandler) HandleMessage(m *nsq.Message) error {
}

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -90,12 +90,7 @@ func main() {
*maxInFlight = *totalMessages
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_tail/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cfg.MaxInFlight = *maxInFlight

consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
Expand Down
34 changes: 16 additions & 18 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
Expand All @@ -54,10 +53,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
Expand Down Expand Up @@ -93,13 +88,15 @@ type TopicDiscoverer struct {
termChan chan os.Signal
hupChan chan os.Signal
wg sync.WaitGroup
cfg *nsq.Config
}

func newTopicDiscoverer() *TopicDiscoverer {
func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer {
return &TopicDiscoverer{
topics: make(map[string]*ConsumerFileLogger),
termChan: make(chan os.Signal),
hupChan: make(chan os.Signal),
cfg: cfg,
}
}

Expand Down Expand Up @@ -360,20 +357,12 @@ func hasArg(s string) bool {
return false
}

func newConsumerFileLogger(topic string) (*ConsumerFileLogger, error) {
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
err = app.ParseOpts(cfg, consumerOpts)
if err != nil {
return nil, err
}
cfg.MaxInFlight = *maxInFlight

consumer, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -424,7 +413,7 @@ func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
log.Println("Skipping topic ", topic, "as it didn't match required pattern:", pattern)
continue
}
logger, err := newConsumerFileLogger(topic)
logger, err := newConsumerFileLogger(topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
continue
Expand Down Expand Up @@ -466,6 +455,12 @@ func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
}

func main() {
cfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -505,7 +500,10 @@ func main() {
}
}

discoverer := newTopicDiscoverer()
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight

discoverer := newTopicDiscoverer(cfg)

signal.Notify(discoverer.hupChan, syscall.SIGHUP)
signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -528,7 +526,7 @@ func main() {
continue
}

logger, err := newConsumerFileLogger(topic)
logger, err := newConsumerFileLogger(topic, cfg)
if err != nil {
log.Fatalf("ERROR: couldn't create logger for topic %s: %s", topic, err)
}
Expand Down
14 changes: 5 additions & 9 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")

consumerOpts = app.StringArray{}
getAddrs = app.StringArray{}
postAddrs = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
Expand All @@ -60,9 +59,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&postAddrs, "post", "HTTP address to make a POST request to. data will be in the body (may be given multiple times)")
flag.Var(&getAddrs, "get", "HTTP address to make a GET request to. '%s' will be printf replaced with data (may be given multiple times)")
Expand Down Expand Up @@ -171,6 +167,11 @@ func hasArg(s string) bool {
}

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

var publisher Publisher
var addresses app.StringArray
var selectedMode int
Expand Down Expand Up @@ -259,12 +260,7 @@ func main() {
addresses = getAddrs
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
Expand Down
30 changes: 10 additions & 20 deletions apps/nsq_to_nsq/nsq_to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables")
mode = flag.String("mode", "hostpool", "the upstream request mode options: round-robin, hostpool (default), epsilon-greedy")

consumerOpts = app.StringArray{}
producerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
destNsqdTCPAddrs = app.StringArray{}
Expand All @@ -57,11 +55,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
Expand Down Expand Up @@ -274,6 +267,14 @@ func hasArg(s string) bool {
func main() {
var selectedMode int

cCfg := nsq.NewConfig()
pCfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cCfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -324,12 +325,7 @@ func main() {

defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

cCfg := nsq.NewConfig()
cCfg.UserAgent = defaultUA
err := app.ParseOpts(cCfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cCfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
Expand All @@ -338,19 +334,13 @@ func main() {
cCfg.MaxBackoffDuration = *maxBackoffDuration
}

pCfg := nsq.NewConfig()
pCfg.UserAgent = defaultUA

err = app.ParseOpts(pCfg, producerOpts)
if err != nil {
log.Fatal(err)
}

consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
if err != nil {
log.Fatal(err)
}

pCfg.UserAgent = defaultUA

producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
producer, err := nsq.NewProducer(addr, pCfg)
Expand Down
11 changes: 3 additions & 8 deletions apps/to_nsq/to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ var (
delimiter = flag.String("delimiter", "\n", "character to split input from stdin (defaults to '\n')")

destNsqdTCPAddrs = app.StringArray{}
producerOpts = app.StringArray{}
)

func init() {
flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&destNsqdTCPAddrs, "nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
}

func main() {
cfg := nsq.NewConfig()
flag.Var(&nsq.ConfigFlag{cfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if len(*topic) == 0 {
Expand All @@ -46,14 +47,8 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

err := app.ParseOpts(cfg, producerOpts)
if err != nil {
log.Fatal(err)
}

// make the producers
producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
Expand Down
29 changes: 0 additions & 29 deletions internal/app/parse_opts.go

This file was deleted.

0 comments on commit 7db7687

Please sign in to comment.