Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: per-topic WAL #625

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open

nsqd: per-topic WAL #625

wants to merge 36 commits into from

Conversation

mreiferson
Copy link
Member

@mreiferson mreiferson commented Aug 9, 2015

This introduces a write-ahead log for nsqd topics using https://github.com/mreiferson/wal. This is the first part of #510.

At a high level, the WAL is a filesystem backed FIFO queue with monotonically increasing IDs, optional periodic flushing semantics (off by default), and the ability to open a "cursor" at any index within its retention window. Many cursors can be open simultaneously at different indices.

Each topic has its own WAL and each channel has its own cursor into it. The filesystem replaces the per-topic goroutine messagePump() that previously copied messages to channels. Instead, we use Go's low-level sync.Condition to "wake up" any open cursors so they can advance when events are appended.

There are quite a few remaining items to figure out here, namely:

  • proper depth accounting
    • persistence of state to survive restarts
  • what happens to BackendDepth?
  • DPUB
  • channel emptying
  • topic pausing
  • ephemeral topics/channels
  • msg attempts accounting
  • proper retention threshold for persisted messages

@mreiferson
Copy link
Member Author

This is a good sign so far:

screen shot 2015-08-09 at 2 21 02 pm

@mreiferson
Copy link
Member Author

Updated description, let me know if any of this sounds insane @jehiah.

/cc @stephensearles @judwhite

@mreiferson
Copy link
Member Author

depth, FIN accounting, retention windows

The current plan is for each topic to maintain a RangeSet (thanks @rolyatmax). A RangeSet maintains a list of contiguous ranges of message IDs. For the topic, it will store the ranges of IDs of messages that have been published. For each channel, it will store the ranges of IDs of messages that have been FINd (for that channel). Thus, the difference in "count" between the two sets should be the current depth of a given channel. Topics will no longer have non-zero depth unless there are no channels present, in which case it is the count of its RangeSet.

I don't really know what to do about BackendDepth - what do you think @jehiah?

This is also the mechanism we can use to determine what the retention window for a topic should be. By scanning each channel's RangeSets, we can determine the lowest non-contiguous index. The lowest index across channels is the retention point for the topic. Since we'll probably provide some configurable options around how much data we keep around, the largest of these values will be chosen as the actual retention point.

@Dieterbe
Copy link
Contributor

any thoughts on how the performance of this compares for single channel, 100% of msg going into diskqueue vs into WAL, and 100% of reads going out of diskqueue vs WAL ?

(i presume once multiple channels come into play, the shared WAL vs individual diskqueues makes the WAL significicantly better)

also: will there be (optional) sync-ack semantics? so that the producer only gets an ack when the data has been synced to disk. needless to say this comes with a performance tradeoff but can be worth it if message size is big enough.

@mreiferson
Copy link
Member Author

@Dieterbe working on correctness right now 😁. Performance should be better.

@mreiferson mreiferson changed the title nsqd: per-topic WAL [dev] nsqd: per-topic WAL Aug 10, 2015
@Dieterbe
Copy link
Contributor

@mreiferson is ack-on-sync something you think should make it into nsqd as well?
i think it would be great if my producers could know a message is safely stored in the diskqueue/WAL and synced to disk. (i'm looking into nsqd for a new project and kinda need this feature. if you think it sounds good i would happily work on this and PR it)

@mreiferson
Copy link
Member Author

@Dieterbe that already is the case in both the current production implementation (IF the message overflowed to disk) and this PR. If it were not the case there would be no back-pressure!

@Dieterbe
Copy link
Contributor

@mreiferson I doesn't look like it.. or there is a misunderstanding.
(d *diskQueue) Put(data []byte) writes to d.writeChan and returns when it reads from d.writeResponseChan. and those ops only have a count++ and a writeOne() (which only syncs when we open a new file) in between, the sync is only enforced at the next iteration of the loop. so Put() returns before the sync to disk happens (unless a new file is opened), so message loss can still happen. which is what i'm trying to avoid. am i missing something? (also, sorry to distract slightly from your WAL work)

@mreiferson
Copy link
Member Author

Sorry, I misunderstood what you meant by "sync" - no, there is no mechanism to deliver fsync acks to producers.

The complexity doesn't feel like something that would be worth it. The edge cases where you would lose messages, although they certainly exist, are probably better handled via replication.

You could also run with --sync-every=1 but that would also be quite aggressive.

@mreiferson
Copy link
Member Author

NOTE: there is still back-pressure from the periodic fsync because DiskQueue cannot make progress during a sync and individual writes are synchronized with the PUB.

@stephensearles
Copy link

Nice work! I'll dig into it a bit more later this week

@Dieterbe
Copy link
Contributor

@mreiferson: does this mean that

  1. if this is enabled, we have guaranteed FIFO message ordering semantics?
  2. will consumers be able to seek to arbitrary positions? for example if i have a log of the last 24h, and my consumer hasn't consumed anything in the last 4 hours, but it always prefers data 5 minutes old and more recent, and i know that i have a message per second will by consumer be able to say "seek to 300 messages before the end of the queue , and consume from there until the end" ? (and then later that consumer, or a different one, will make sure to also read the entire 4h range) something like this would be really useful to me.

@mreiferson
Copy link
Member Author

  1. No, not exactly. On a per-node basis, despite the WAL itself being FIFO, there are still requeues that will be redelivered based on their expiration. And, across a cluster in aggregate, there is no coordination and no guaranteed ordering.
  2. That will not be in this first pass, but is a feature we could consider providing down the road based on this work.

@Dieterbe
Copy link
Contributor

  1. oh yes sure, i just meant single nsqd and assuming no requeues. a bit of a bold assumption i know.. let's call it "best case scenario" ;-)
  2. cool thanks. i thought it wouldn't be possible due to messages having varying sizes so you wouldn't know where to seek to unless you start reading from the beginning. would this require adding an extra index datastructure or something? or is that already included in the design?

@mreiferson
Copy link
Member Author

(2) It's already required to be able to seek to a certain index

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants