diff --git a/nsqd/channel.go b/nsqd/channel.go index ef0864bd8..6e979c73b 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -55,11 +55,11 @@ type Channel struct { exitFlag int32 // state tracking - clients map[int64]Consumer - paused int32 - ephemeralChannel bool - deleteCallback func(*Channel) - deleter sync.Once + clients map[int64]Consumer + paused int32 + ephemeral bool + deleteCallback func(*Channel) + deleter sync.Once // Stats tracking e2eProcessingLatencyStream *util.Quantile @@ -100,7 +100,7 @@ func NewChannel(topicName string, channelName string, ctx *context, c.initPQ() if strings.HasSuffix(channelName, "#ephemeral") { - c.ephemeralChannel = true + c.ephemeral = true c.backend = newDummyBackendQueue() } else { // backend names, for uniqueness, automatically include the topic... @@ -421,7 +421,7 @@ func (c *Channel) RemoveClient(clientID int64) { } delete(c.clients, clientID) - if len(c.clients) == 0 && c.ephemeralChannel == true { + if len(c.clients) == 0 && c.ephemeral == true { go c.deleter.Do(func() { c.deleteCallback(c) }) } } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 099e23c58..c2babe0a2 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -303,7 +303,7 @@ func (n *NSQD) PersistMetadata() error { topic.Lock() for _, channel := range topic.channelMap { channel.Lock() - if !channel.ephemeralChannel { + if !channel.ephemeral { channelData := make(map[string]interface{}) channelData["name"] = channel.name channelData["paused"] = channel.IsPaused() @@ -384,7 +384,10 @@ func (n *NSQD) GetTopic(topicName string) *Topic { n.Unlock() return t } else { - t = NewTopic(topicName, &context{n}) + deleteCallback := func(t *Topic) { + n.DeleteExistingTopic(t.name) + } + t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.logf("TOPIC(%s): created", t.name) diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index c7de6473a..1515b2128 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -180,15 +180,14 @@ func TestStartup(t *testing.T) { <-doneExitChan } -func TestEphemeralChannel(t *testing.T) { - // a normal channel sticks around after clients disconnect; an ephemeral channel is - // lazily removed after the last client disconnects +func TestEphemeralTopicsAndChannels(t *testing.T) { + // ephemeral topics/channels are lazily removed after the last channel/client is removed opts := NewNSQDOptions() opts.Logger = newTestLogger(t) opts.MemQueueSize = 100 _, _, nsqd := mustStartNSQD(opts) - topicName := "ephemeral_test" + strconv.Itoa(int(time.Now().Unix())) + topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral" doneExitChan := make(chan int) exitChan := make(chan int) @@ -209,7 +208,6 @@ func TestEphemeralChannel(t *testing.T) { msg = <-ephemeralChannel.clientMsgChan equal(t, msg.Body, body) - t.Logf("pulling from channel") ephemeralChannel.RemoveClient(client.ID) time.Sleep(50 * time.Millisecond) @@ -219,6 +217,11 @@ func TestEphemeralChannel(t *testing.T) { topic.Unlock() equal(t, numChannels, 0) + nsqd.Lock() + numTopics := len(nsqd.topicMap) + nsqd.Unlock() + equal(t, numTopics, 0) + exitChan <- 1 <-doneExitChan } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 1e1e3298d..e08095a87 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -110,7 +110,7 @@ func TestChannelTopicNames(t *testing.T) { equal(t, util.IsValidChannelName("test#ephemeral"), true) equal(t, util.IsValidTopicName("test"), true) equal(t, util.IsValidTopicName("test-with_period."), true) - equal(t, util.IsValidTopicName("test#ephemeral"), false) + equal(t, util.IsValidTopicName("test#ephemeral"), true) equal(t, util.IsValidTopicName("test:ephemeral"), false) } diff --git a/nsqd/topic.go b/nsqd/topic.go index 35c7ce4b0..690a96993 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -3,6 +3,7 @@ package nsqd import ( "bytes" "errors" + "strings" "sync" "sync/atomic" @@ -24,6 +25,10 @@ type Topic struct { waitGroup util.WaitGroupWrapper exitFlag int32 + ephemeral bool + deleteCallback func(*Topic) + deleter sync.Once + paused int32 pauseChan chan bool @@ -31,23 +36,28 @@ type Topic struct { } // Topic constructor -func NewTopic(topicName string, ctx *context) *Topic { - diskQueue := newDiskQueue(topicName, - ctx.nsqd.opts.DataPath, - ctx.nsqd.opts.MaxBytesPerFile, - ctx.nsqd.opts.SyncEvery, - ctx.nsqd.opts.SyncTimeout, - ctx.nsqd.opts.Logger) - +func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), - backend: diskQueue, memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, pauseChan: make(chan bool), + deleteCallback: deleteCallback, + } + + if strings.HasSuffix(topicName, "#ephemeral") { + t.ephemeral = true + t.backend = newDummyBackendQueue() + } else { + t.backend = newDiskQueue(topicName, + ctx.nsqd.opts.DataPath, + ctx.nsqd.opts.MaxBytesPerFile, + ctx.nsqd.opts.SyncEvery, + ctx.nsqd.opts.SyncTimeout, + ctx.nsqd.opts.Logger) } t.waitGroup.Wrap(func() { t.messagePump() }) @@ -116,6 +126,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error { } delete(t.channelMap, channelName) // not defered so that we can continue while the channel async closes + numChannels := len(t.channelMap) t.Unlock() t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name) @@ -130,6 +141,10 @@ func (t *Topic) DeleteExistingChannel(channelName string) error { case <-t.exitChan: } + if numChannels == 0 && t.ephemeral == true { + go t.deleter.Do(func() { t.deleteCallback(t) }) + } + return nil } diff --git a/util/names.go b/util/names.go index c04b675c6..e5bd830fd 100644 --- a/util/names.go +++ b/util/names.go @@ -4,21 +4,21 @@ import ( "regexp" ) -var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`) -var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) +var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) // IsValidTopicName checks a topic name for correctness func IsValidTopicName(name string) bool { - if len(name) > 64 || len(name) < 1 { - return false - } - return validTopicNameRegex.MatchString(name) + return isValidName(name) } // IsValidChannelName checks a channel name for correctness func IsValidChannelName(name string) bool { + return isValidName(name) +} + +func isValidName(name string) bool { if len(name) > 64 || len(name) < 1 { return false } - return validChannelNameRegex.MatchString(name) + return validTopicChannelNameRegex.MatchString(name) }