Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

utilities: use nsq.ConfigFlag #640

Merged
merged 1 commit into from
Aug 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/BurntSushi/toml 2dff11163ee667d51dcc066660925a92ce138deb
github.com/bitly/go-hostpool 58b95b10d6ca26723a7f46017b348653b825a8d6
github.com/bitly/go-nsq 4271cd1529a78175e327570894988cc2cb21228f # v1.0.5-alpha
github.com/bitly/go-nsq 0f97a46d801c18d6fd1a12a9040d11cc4e4ef397 # v1.0.5-alpha
github.com/bitly/go-simplejson 18db6e68d8fd9cbf2e8ebe4c81a78b96fd9bf05a
github.com/bmizerany/perks/quantile 6cb9d9d729303ee2628580d9aec5db968da3a607
github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da
Expand Down
15 changes: 5 additions & 10 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@ var (
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)")

consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}
Expand All @@ -59,6 +54,11 @@ func (th *TailHandler) HandleMessage(m *nsq.Message) error {
}

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -90,12 +90,7 @@ func main() {
*maxInFlight = *totalMessages
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_tail/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cfg.MaxInFlight = *maxInFlight

consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
Expand Down
34 changes: 16 additions & 18 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
Expand All @@ -54,10 +53,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
Expand Down Expand Up @@ -93,13 +88,15 @@ type TopicDiscoverer struct {
termChan chan os.Signal
hupChan chan os.Signal
wg sync.WaitGroup
cfg *nsq.Config
}

func newTopicDiscoverer() *TopicDiscoverer {
func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer {
return &TopicDiscoverer{
topics: make(map[string]*ConsumerFileLogger),
termChan: make(chan os.Signal),
hupChan: make(chan os.Signal),
cfg: cfg,
}
}

Expand Down Expand Up @@ -360,20 +357,12 @@ func hasArg(s string) bool {
return false
}

func newConsumerFileLogger(topic string) (*ConsumerFileLogger, error) {
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
err = app.ParseOpts(cfg, consumerOpts)
if err != nil {
return nil, err
}
cfg.MaxInFlight = *maxInFlight

consumer, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -424,7 +413,7 @@ func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
log.Println("Skipping topic ", topic, "as it didn't match required pattern:", pattern)
continue
}
logger, err := newConsumerFileLogger(topic)
logger, err := newConsumerFileLogger(topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
continue
Expand Down Expand Up @@ -466,6 +455,12 @@ func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
}

func main() {
cfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -505,7 +500,10 @@ func main() {
}
}

discoverer := newTopicDiscoverer()
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight

discoverer := newTopicDiscoverer(cfg)

signal.Notify(discoverer.hupChan, syscall.SIGHUP)
signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -528,7 +526,7 @@ func main() {
continue
}

logger, err := newConsumerFileLogger(topic)
logger, err := newConsumerFileLogger(topic, cfg)
if err != nil {
log.Fatalf("ERROR: couldn't create logger for topic %s: %s", topic, err)
}
Expand Down
14 changes: 5 additions & 9 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")

consumerOpts = app.StringArray{}
getAddrs = app.StringArray{}
postAddrs = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
Expand All @@ -60,9 +59,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&postAddrs, "post", "HTTP address to make a POST request to. data will be in the body (may be given multiple times)")
flag.Var(&getAddrs, "get", "HTTP address to make a GET request to. '%s' will be printf replaced with data (may be given multiple times)")
Expand Down Expand Up @@ -171,6 +167,11 @@ func hasArg(s string) bool {
}

func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

var publisher Publisher
var addresses app.StringArray
var selectedMode int
Expand Down Expand Up @@ -259,12 +260,7 @@ func main() {
addresses = getAddrs
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
Expand Down
30 changes: 10 additions & 20 deletions apps/nsq_to_nsq/nsq_to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables")
mode = flag.String("mode", "hostpool", "the upstream request mode options: round-robin, hostpool (default), epsilon-greedy")

consumerOpts = app.StringArray{}
producerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
destNsqdTCPAddrs = app.StringArray{}
Expand All @@ -57,11 +55,6 @@ var (
)

func init() {
// TODO: remove, deprecated
flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
Expand Down Expand Up @@ -274,6 +267,14 @@ func hasArg(s string) bool {
func main() {
var selectedMode int

cCfg := nsq.NewConfig()
pCfg := nsq.NewConfig()

// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cCfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if *showVersion {
Expand Down Expand Up @@ -324,12 +325,7 @@ func main() {

defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

cCfg := nsq.NewConfig()
cCfg.UserAgent = defaultUA
err := app.ParseOpts(cCfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
cCfg.MaxInFlight = *maxInFlight

// TODO: remove, deprecated
Expand All @@ -338,19 +334,13 @@ func main() {
cCfg.MaxBackoffDuration = *maxBackoffDuration
}

pCfg := nsq.NewConfig()
pCfg.UserAgent = defaultUA

err = app.ParseOpts(pCfg, producerOpts)
if err != nil {
log.Fatal(err)
}

consumer, err := nsq.NewConsumer(*topic, *channel, cCfg)
if err != nil {
log.Fatal(err)
}

pCfg.UserAgent = defaultUA

producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
producer, err := nsq.NewProducer(addr, pCfg)
Expand Down
11 changes: 3 additions & 8 deletions apps/to_nsq/to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ var (
delimiter = flag.String("delimiter", "\n", "character to split input from stdin (defaults to '\n')")

destNsqdTCPAddrs = app.StringArray{}
producerOpts = app.StringArray{}
)

func init() {
flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&destNsqdTCPAddrs, "nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
}

func main() {
cfg := nsq.NewConfig()
flag.Var(&nsq.ConfigFlag{cfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")

flag.Parse()

if len(*topic) == 0 {
Expand All @@ -46,14 +47,8 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

err := app.ParseOpts(cfg, producerOpts)
if err != nil {
log.Fatal(err)
}

// make the producers
producers := make(map[string]*nsq.Producer)
for _, addr := range destNsqdTCPAddrs {
Expand Down
29 changes: 0 additions & 29 deletions internal/app/parse_opts.go

This file was deleted.