Skip to content

Commit

Permalink
nsqd: add #ephemeral topics
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Sep 22, 2014
1 parent d96561d commit 4fc25e0
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 31 deletions.
14 changes: 7 additions & 7 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Channel struct {
exitFlag int32

// state tracking
clients map[int64]Consumer
paused int32
ephemeralChannel bool
deleteCallback func(*Channel)
deleter sync.Once
clients map[int64]Consumer
paused int32
ephemeral bool
deleteCallback func(*Channel)
deleter sync.Once

// Stats tracking
e2eProcessingLatencyStream *util.Quantile
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.ephemeral = true
c.backend = newDummyBackendQueue()
} else {
// backend names, for uniqueness, automatically include the topic...
Expand Down Expand Up @@ -421,7 +421,7 @@ func (c *Channel) RemoveClient(clientID int64) {
}
delete(c.clients, clientID)

if len(c.clients) == 0 && c.ephemeralChannel == true {
if len(c.clients) == 0 && c.ephemeral == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
}
}
Expand Down
7 changes: 5 additions & 2 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (n *NSQD) PersistMetadata() error {
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if !channel.ephemeralChannel {
if !channel.ephemeral {
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
Expand Down Expand Up @@ -384,7 +384,10 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.Unlock()
return t
} else {
t = NewTopic(topicName, &context{n})
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

n.logf("TOPIC(%s): created", t.name)
Expand Down
13 changes: 8 additions & 5 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,14 @@ func TestStartup(t *testing.T) {
<-doneExitChan
}

func TestEphemeralChannel(t *testing.T) {
// a normal channel sticks around after clients disconnect; an ephemeral channel is
// lazily removed after the last client disconnects
func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
_, _, nsqd := mustStartNSQD(opts)

topicName := "ephemeral_test" + strconv.Itoa(int(time.Now().Unix()))
topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral"
doneExitChan := make(chan int)

exitChan := make(chan int)
Expand All @@ -209,7 +208,6 @@ func TestEphemeralChannel(t *testing.T) {
msg = <-ephemeralChannel.clientMsgChan
equal(t, msg.Body, body)

t.Logf("pulling from channel")
ephemeralChannel.RemoveClient(client.ID)

time.Sleep(50 * time.Millisecond)
Expand All @@ -219,6 +217,11 @@ func TestEphemeralChannel(t *testing.T) {
topic.Unlock()
equal(t, numChannels, 0)

nsqd.Lock()
numTopics := len(nsqd.topicMap)
nsqd.Unlock()
equal(t, numTopics, 0)

exitChan <- 1
<-doneExitChan
}
Expand Down
2 changes: 1 addition & 1 deletion nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestChannelTopicNames(t *testing.T) {
equal(t, util.IsValidChannelName("test#ephemeral"), true)
equal(t, util.IsValidTopicName("test"), true)
equal(t, util.IsValidTopicName("test-with_period."), true)
equal(t, util.IsValidTopicName("test#ephemeral"), false)
equal(t, util.IsValidTopicName("test#ephemeral"), true)
equal(t, util.IsValidTopicName("test:ephemeral"), false)
}

Expand Down
33 changes: 24 additions & 9 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"bytes"
"errors"
"strings"
"sync"
"sync/atomic"

Expand All @@ -24,30 +25,39 @@ type Topic struct {
waitGroup util.WaitGroupWrapper
exitFlag int32

ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once

paused int32
pauseChan chan bool

ctx *context
}

// Topic constructor
func NewTopic(topicName string, ctx *context) *Topic {
diskQueue := newDiskQueue(topicName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
backend: diskQueue,
memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
pauseChan: make(chan bool),
deleteCallback: deleteCallback,
}

if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {
t.backend = newDiskQueue(topicName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)
}

t.waitGroup.Wrap(func() { t.messagePump() })
Expand Down Expand Up @@ -116,6 +126,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
}
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()

t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name)
Expand All @@ -130,6 +141,10 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
case <-t.exitChan:
}

if numChannels == 0 && t.ephemeral == true {
go t.deleter.Do(func() { t.deleteCallback(t) })
}

return nil
}

Expand Down
14 changes: 7 additions & 7 deletions util/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"regexp"
)

var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`)
var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)

// IsValidTopicName checks a topic name for correctness
func IsValidTopicName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validTopicNameRegex.MatchString(name)
return isValidName(name)
}

// IsValidChannelName checks a channel name for correctness
func IsValidChannelName(name string) bool {
return isValidName(name)
}

func isValidName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validChannelNameRegex.MatchString(name)
return validTopicChannelNameRegex.MatchString(name)
}

1 comment on commit 4fc25e0

@clintberry
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! Now I feel I can do proper Pub Sub. If no Subs, then ignore the Pubs! :-)

Please sign in to comment.