Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support get-old-kv in watch #5850

Merged
merged 1 commit into from
Jul 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@
"preserveKVs": {
"type": "boolean",
"format": "boolean",
"description": "If preserveKVs is set, the deleted KVs will be preserved.\nThe preserved KVs will be returned as response.\nIt requires read permission to read the deleted KVs."
"description": "If preserveKVs is set, the deleted KVs will be preserved for delete events\nThe preserved KVs will be returned as response.\nIt requires read permission to read the deleted KVs."
},
"range_end": {
"type": "string",
Expand Down Expand Up @@ -1204,6 +1204,11 @@
"format": "byte",
"description": "key is the key to register for watching."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, created watcher gets the previous KV before the event happens.\nIf the previous KV is already compacted, nothing will be returned."
},
"progress_notify": {
"type": "boolean",
"format": "boolean",
Expand Down Expand Up @@ -1273,6 +1278,10 @@
"$ref": "#/definitions/mvccpbKeyValue",
"description": "kv holds the KeyValue for the event.\nA PUT event contains current kv pair.\nA PUT event with kv.Version=1 indicates the creation of a key.\nA DELETE/EXPIRE event contains the deleted key with\nits modification revision set to the revision of deletion."
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "prev_kv holds the key-value pair before the event happens."
},
"type": {
"$ref": "#/definitions/EventEventType",
"description": "type is the kind of event. If type is a PUT, it indicates\nnew data has been stored to the key. If type is a DELETE,\nit indicates the key was deleted."
Expand Down
10 changes: 10 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Op struct {
// for range, watch
rev int64

prevKV bool

// for delete
preserveKVs bool

Expand Down Expand Up @@ -276,3 +278,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true
}
}

// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
// nothing will be returned.
func WithPrevKV() OpOption {
return func(op *Op) {
op.prevKV = true
}
}
6 changes: 5 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ type watchRequest struct {
key string
end string
rev int64
// progressNotify is for progress updates.
// progressNotify is for progress updates
progressNotify bool
// get the previous key-value pair before the event happens
prevKV bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
Expand Down Expand Up @@ -209,6 +211,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
prevKV: ow.prevKV,
retc: retc,
}

Expand Down Expand Up @@ -682,6 +685,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
PrevKv: wr.prevKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
Expand Down
4 changes: 3 additions & 1 deletion etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `

- prefix -- watch on a prefix if prefix is set.

- prev-kv -- get the previous key-value pair before the event happens.

- rev -- the revision to start watching. Specifying a revision is useful for observing past events.

#### Input Format
Expand All @@ -245,7 +247,7 @@ watch [options] <key or prefix>\n

##### Simple reply

- \<event\>\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- \<event\>[\n\<old_key\>\n\<old_value\>]\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...

- Additional error string if WATCH failed. Exit code is non-zero.

Expand Down
3 changes: 3 additions & 0 deletions etcdctl/ctlv3/command/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (s *simplePrinter) Txn(resp v3.TxnResponse) {
func (s *simplePrinter) Watch(resp v3.WatchResponse) {
for _, e := range resp.Events {
fmt.Println(e.Type)
if e.PrevKv != nil {
printKV(s.isHex, e.PrevKv)
}
printKV(s.isHex, e.Kv)
}
}
Expand Down
6 changes: 6 additions & 0 deletions etcdctl/ctlv3/command/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
watchRev int64
watchPrefix bool
watchInteractive bool
watchPrevKey bool
)

// NewWatchCommand returns the cobra command for "watch".
Expand All @@ -42,6 +43,7 @@ func NewWatchCommand() *cobra.Command {
cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")

return cmd
}
Expand All @@ -68,6 +70,10 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if watchPrevKey {
opts = append(opts, clientv3.WithPrevKV())
}

c := mustClientFromCmd(cmd)
wc := c.Watch(context.TODO(), key, opts...)
printWatchCh(wc)
Expand Down
36 changes: 30 additions & 6 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}

func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
Expand Down Expand Up @@ -82,13 +82,18 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer

watchable mvcc.WatchableKV

gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse

// progress tracks the watchID that stream might need to send
// progress to.
// TOOD: combine progress and prevKV into a single struct?
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool

// mu protects progress
mu sync.Mutex

Expand All @@ -101,14 +106,18 @@ type serverWatchStream struct {

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,

watchable: ws.watchable,

gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}

Expand Down Expand Up @@ -181,8 +190,13 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 && creq.ProgressNotify {
sws.progress[id] = true
if id != -1 {
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
Expand All @@ -207,6 +221,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
Expand Down Expand Up @@ -251,8 +266,17 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
needPrevKV := sws.prevKV[wresp.WatchID]
for i := range evs {
events[i] = &evs[i]

if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}

wr := &pb.WatchResponse{
Expand Down
Loading