Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: ephemeral topics #305

Merged
merged 1 commit into from
Sep 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Channel struct {
exitFlag int32

// state tracking
clients map[int64]Consumer
paused int32
ephemeralChannel bool
deleteCallback func(*Channel)
deleter sync.Once
clients map[int64]Consumer
paused int32
ephemeral bool
deleteCallback func(*Channel)
deleter sync.Once

// Stats tracking
e2eProcessingLatencyStream *util.Quantile
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.ephemeral = true
c.backend = newDummyBackendQueue()
} else {
// backend names, for uniqueness, automatically include the topic...
Expand Down Expand Up @@ -421,7 +421,7 @@ func (c *Channel) RemoveClient(clientID int64) {
}
delete(c.clients, clientID)

if len(c.clients) == 0 && c.ephemeralChannel == true {
if len(c.clients) == 0 && c.ephemeral == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
}
}
Expand Down
7 changes: 5 additions & 2 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (n *NSQD) PersistMetadata() error {
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if !channel.ephemeralChannel {
if !channel.ephemeral {
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
Expand Down Expand Up @@ -384,7 +384,10 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.Unlock()
return t
} else {
t = NewTopic(topicName, &context{n})
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

n.logf("TOPIC(%s): created", t.name)
Expand Down
13 changes: 8 additions & 5 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,14 @@ func TestStartup(t *testing.T) {
<-doneExitChan
}

func TestEphemeralChannel(t *testing.T) {
// a normal channel sticks around after clients disconnect; an ephemeral channel is
// lazily removed after the last client disconnects
func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
_, _, nsqd := mustStartNSQD(opts)

topicName := "ephemeral_test" + strconv.Itoa(int(time.Now().Unix()))
topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral"
doneExitChan := make(chan int)

exitChan := make(chan int)
Expand All @@ -209,7 +208,6 @@ func TestEphemeralChannel(t *testing.T) {
msg = <-ephemeralChannel.clientMsgChan
equal(t, msg.Body, body)

t.Logf("pulling from channel")
ephemeralChannel.RemoveClient(client.ID)

time.Sleep(50 * time.Millisecond)
Expand All @@ -219,6 +217,11 @@ func TestEphemeralChannel(t *testing.T) {
topic.Unlock()
equal(t, numChannels, 0)

nsqd.Lock()
numTopics := len(nsqd.topicMap)
nsqd.Unlock()
equal(t, numTopics, 0)

exitChan <- 1
<-doneExitChan
}
Expand Down
2 changes: 1 addition & 1 deletion nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestChannelTopicNames(t *testing.T) {
equal(t, util.IsValidChannelName("test#ephemeral"), true)
equal(t, util.IsValidTopicName("test"), true)
equal(t, util.IsValidTopicName("test-with_period."), true)
equal(t, util.IsValidTopicName("test#ephemeral"), false)
equal(t, util.IsValidTopicName("test#ephemeral"), true)
equal(t, util.IsValidTopicName("test:ephemeral"), false)
}

Expand Down
33 changes: 24 additions & 9 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"bytes"
"errors"
"strings"
"sync"
"sync/atomic"

Expand All @@ -24,30 +25,39 @@ type Topic struct {
waitGroup util.WaitGroupWrapper
exitFlag int32

ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once

paused int32
pauseChan chan bool

ctx *context
}

// Topic constructor
func NewTopic(topicName string, ctx *context) *Topic {
diskQueue := newDiskQueue(topicName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
backend: diskQueue,
memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
pauseChan: make(chan bool),
deleteCallback: deleteCallback,
}

if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {
t.backend = newDiskQueue(topicName,
ctx.nsqd.opts.DataPath,
ctx.nsqd.opts.MaxBytesPerFile,
ctx.nsqd.opts.SyncEvery,
ctx.nsqd.opts.SyncTimeout,
ctx.nsqd.opts.Logger)
}

t.waitGroup.Wrap(func() { t.messagePump() })
Expand Down Expand Up @@ -116,6 +126,7 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
}
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()

t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name)
Expand All @@ -130,6 +141,10 @@ func (t *Topic) DeleteExistingChannel(channelName string) error {
case <-t.exitChan:
}

if numChannels == 0 && t.ephemeral == true {
go t.deleter.Do(func() { t.deleteCallback(t) })
}

return nil
}

Expand Down
14 changes: 7 additions & 7 deletions util/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"regexp"
)

var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`)
var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)
var validTopicChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`)

// IsValidTopicName checks a topic name for correctness
func IsValidTopicName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validTopicNameRegex.MatchString(name)
return isValidName(name)
}

// IsValidChannelName checks a channel name for correctness
func IsValidChannelName(name string) bool {
return isValidName(name)
}

func isValidName(name string) bool {
if len(name) > 64 || len(name) < 1 {
return false
}
return validChannelNameRegex.MatchString(name)
return validTopicChannelNameRegex.MatchString(name)
}