-
Notifications
You must be signed in to change notification settings - Fork 79
/
cursor_impl.go
392 lines (342 loc) · 13.3 KB
/
cursor_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package driver
import (
"context"
"encoding/json"
"path"
"reflect"
"sync"
"sync/atomic"
"time"
)
// newCursor creates a new Cursor implementation.
func newCursor(data cursorData, endpoint string, db *database, allowDirtyReads bool) (Cursor, error) {
if db == nil {
return nil, WithStack(InvalidArgumentError{Message: "db is nil"})
}
c := &cursor{
cursorData: data,
endpoint: endpoint,
db: db,
conn: db.conn,
allowDirtyReads: allowDirtyReads,
}
if data.NextBatchID != "" {
c.retryData = &retryData{
cursorID: data.ID,
currentBatchID: "1",
}
}
return c, nil
}
type cursor struct {
cursorData
endpoint string
resultIndex int
retryData *retryData
db *database
conn Connection
closed int32
closeMutex sync.Mutex
allowDirtyReads bool
lastReadWasDirty bool
}
type retryData struct {
cursorID string
currentBatchID string
}
// CursorStats TODO: all these int64 should be changed into uint64
type cursorStats struct {
// The total number of data-modification operations successfully executed.
WritesExecutedInt int64 `json:"writesExecuted,omitempty"`
// The total number of data-modification operations that were unsuccessful
WritesIgnoredInt int64 `json:"writesIgnored,omitempty"`
// The total number of documents iterated over when scanning a collection without an index.
ScannedFullInt int64 `json:"scannedFull,omitempty"`
// The total number of documents iterated over when scanning a collection using an index.
ScannedIndexInt int64 `json:"scannedIndex,omitempty"`
// The total number of documents that were removed after executing a filter condition in a FilterNode
FilteredInt int64 `json:"filtered,omitempty"`
// The total number of documents that matched the search condition if the query's final LIMIT statement were not present.
FullCountInt int64 `json:"fullCount,omitempty"`
// Query execution time (wall-clock time). value will be set from the outside
ExecutionTimeInt float64 `json:"executionTime,omitempty"`
Nodes []cursorPlanNodes `json:"nodes,omitempty"`
HttpRequests int64 `json:"httpRequests,omitempty"`
PeakMemoryUsage int64 `json:"peakMemoryUsage,omitempty"`
// CursorsCreated the total number of cursor objects created during query execution. Cursor objects are created for index lookups.
CursorsCreated uint64 `json:"cursorsCreated,omitempty"`
// CursorsRearmed the total number of times an existing cursor object was repurposed.
// Repurposing an existing cursor object is normally more efficient compared to destroying an existing cursor object
// and creating a new one from scratch.
CursorsRearmed uint64 `json:"cursorsRearmed,omitempty"`
// CacheHits the total number of index entries read from in-memory caches for indexes of type edge or persistent.
// This value will only be non-zero when reading from indexes that have an in-memory cache enabled,
// and when the query allows using the in-memory cache (i.e., using equality lookups on all index attributes).
CacheHits uint64 `json:"cacheHits,omitempty"`
// CacheMisses the total number of cache read attempts for index entries that could not be served from in-memory caches for indexes of type edge or persistent.
// This value will only be non-zero when reading from indexes that have an in-memory cache enabled,
// the query allows using the in-memory cache (i.e., using equality lookups on all index attributes), and the looked-up values are not present in the cache.
CacheMisses uint64 `json:"cacheMisses,omitempty"`
}
type cursorPlan struct {
Nodes []cursorPlanNodes `json:"nodes,omitempty"`
Rules []string `json:"rules,omitempty"`
Collections []cursorPlanCollection `json:"collections,omitempty"`
Variables []cursorPlanVariable `json:"variables,omitempty"`
EstimatedCost float64 `json:"estimatedCost,omitempty"`
EstimatedNrItems int `json:"estimatedNrItems,omitempty"`
IsModificationQuery bool `json:"isModificationQuery,omitempty"`
}
type cursorExtra struct {
Stats cursorStats `json:"stats,omitempty"`
Profile cursorProfile `json:"profile,omitempty"`
Plan *cursorPlan `json:"plan,omitempty"`
Warnings []warn `json:"warnings,omitempty"`
}
type warn struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (c cursorExtra) GetStatistics() QueryStatistics {
return c.Stats
}
func (c cursorExtra) GetProfileRaw() ([]byte, bool, error) {
if c.Profile == nil {
return nil, false, nil
}
d, err := json.Marshal(c.Profile)
if err != nil {
return nil, true, err
}
return d, true, nil
}
func (c cursorExtra) GetPlanRaw() ([]byte, bool, error) {
if c.Plan == nil {
return nil, false, nil
}
d, err := json.Marshal(c.Plan)
if err != nil {
return nil, true, err
}
return d, true, nil
}
type cursorPlanVariable struct {
ID int `json:"id"`
Name string `json:"name"`
IsDataFromCollection bool `json:"isDataFromCollection"`
IsFullDocumentFromCollection bool `json:"isFullDocumentFromCollection"`
}
type cursorPlanCollection struct {
Name string `json:"name"`
Type string `json:"type"`
}
type cursorPlanNodes map[string]interface{}
type cursorProfile map[string]interface{}
type cursorData struct {
Key string `json:"_key,omitempty"`
Count int64 `json:"count,omitempty"` // the total number of result documents available (only available if the query was executed with the count attribute set)
ID string `json:"id"` // id of temporary cursor created on the server (optional, see above)
Result []*RawObject `json:"result,omitempty"` // an array of result documents (might be empty if the query has no results)
HasMore bool `json:"hasMore,omitempty"` // A boolean indicator whether there are more results available for the cursor on the server
Extra cursorExtra `json:"extra"`
Cached bool `json:"cached,omitempty"`
NextBatchID string `json:"nextBatchId,omitempty"`
ArangoError
}
// relPath creates the relative path to this cursor (`_db/<db-name>/_api/cursor`)
func (c *cursor) relPath() string {
return path.Join(c.db.relPath(), "_api", "cursor")
}
// HasMore Name returns the name of the collection.
func (c *cursor) HasMore() bool {
return c.resultIndex < len(c.Result) || c.cursorData.HasMore
}
// Count returns the total number of result documents available.
// A valid return value is only available when the cursor has been created with a context that was
// prepared with `WithQueryCount`.
func (c *cursor) Count() int64 {
return c.cursorData.Count
}
// Close deletes the cursor and frees the resources associated with it.
func (c *cursor) Close() error {
if c == nil {
// Avoid panics in the case that someone defers a close before checking that the cursor is not nil.
return nil
}
if c := atomic.LoadInt32(&c.closed); c != 0 {
return nil
}
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed == 0 {
if c.cursorData.ID != "" {
// Force use of initial endpoint
ctx := WithEndpoint(nil, c.endpoint)
req, err := c.conn.NewRequest("DELETE", path.Join(c.relPath(), c.cursorData.ID))
if err != nil {
return WithStack(err)
}
resp, err := c.conn.Do(ctx, req)
if err != nil {
return WithStack(err)
}
if err := resp.CheckStatus(202); err != nil {
return WithStack(err)
}
}
atomic.StoreInt32(&c.closed, 1)
}
return nil
}
// ReadDocument reads the next document from the cursor.
// The document data is stored into the result, the document metadata is returned.
// If the cursor has no more documents, a NoMoreDocuments error is returned.
func (c *cursor) ReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
return c.readDocument(ctx, result, "")
}
// RetryReadDocument reads the last document from the cursor once more time
// It can be used e.g., in case of network error during ReadDocument
// It requires 'driver.WithQueryAllowRetry' to be set to true on the Context during Cursor creation.
func (c *cursor) RetryReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
if c.resultIndex > 0 {
c.resultIndex--
}
return c.readDocument(ctx, result, c.retryData.currentBatchID)
}
func (c *cursor) readDocument(ctx context.Context, result interface{}, retryBatchID string) (DocumentMeta, error) {
// Force use of initial endpoint
ctx = WithEndpoint(ctx, c.endpoint)
if c.resultIndex >= len(c.Result) && (c.cursorData.HasMore || retryBatchID != "") {
// This is required since we are interested if this was a dirty read,
// but we do not want to trash the users bool reference.
var wasDirtyRead bool
fetchCtx := ctx
if c.allowDirtyReads {
fetchCtx = WithAllowDirtyReads(ctx, &wasDirtyRead)
}
p := path.Join(c.relPath(), c.cursorData.ID)
// If we have a NextBatchID, use it
if c.NextBatchID != "" {
p = path.Join(c.relPath(), c.cursorData.ID, c.NextBatchID)
}
// We have to retry the batch instead of fetching the next one
if retryBatchID != "" {
p = path.Join(c.relPath(), c.retryData.cursorID, retryBatchID)
}
// Update currentBatchID before fetching the next batch (no retry case)
if c.NextBatchID != "" && retryBatchID == "" {
c.retryData.currentBatchID = c.NextBatchID
}
// Fetch the next batch
req, err := c.conn.NewRequest("POST", p)
if err != nil {
return DocumentMeta{}, WithStack(err)
}
cs := applyContextSettings(fetchCtx, req)
resp, err := c.conn.Do(fetchCtx, req)
if err != nil {
return DocumentMeta{}, WithStack(err)
}
if err := resp.CheckStatus(200); err != nil {
return DocumentMeta{}, WithStack(err)
}
loadContextResponseValues(cs, resp)
var data cursorData
if err := resp.ParseBody("", &data); err != nil {
return DocumentMeta{}, WithStack(err)
}
c.cursorData = data
c.resultIndex = 0
c.lastReadWasDirty = wasDirtyRead
}
// ReadDocument should act as if it would actually do a read
// hence update the bool reference
if c.allowDirtyReads {
setDirtyReadFlagIfRequired(ctx, c.lastReadWasDirty)
}
index := c.resultIndex
if index >= len(c.Result) {
// Out of data
return DocumentMeta{}, WithStack(NoMoreDocumentsError{})
}
c.resultIndex++
var meta DocumentMeta
resultPtr := c.Result[index]
if resultPtr == nil {
// Got NULL result
rv := reflect.ValueOf(result)
if rv.Kind() != reflect.Ptr || rv.IsNil() {
return DocumentMeta{}, WithStack(&json.InvalidUnmarshalError{Type: reflect.TypeOf(result)})
}
e := rv.Elem()
e.Set(reflect.Zero(e.Type()))
} else {
if err := c.conn.Unmarshal(*resultPtr, &meta); err != nil {
// If a cursor returns something other than a document, this will fail.
// Just ignore it.
}
if err := c.conn.Unmarshal(*resultPtr, result); err != nil {
return DocumentMeta{}, WithStack(err)
}
}
return meta, nil
}
// Statistics Return execution statistics for this cursor. This might not
// be valid if the cursor has been created with a context that was
// prepared with `WithStream`
func (c *cursor) Statistics() QueryStatistics {
return c.cursorData.Extra.Stats
}
func (c *cursor) Extra() QueryExtra {
return c.cursorData.Extra
}
// WritesExecuted the total number of data-modification operations successfully executed.
func (cs cursorStats) WritesExecuted() int64 {
return cs.WritesExecutedInt
}
// WritesIgnored The total number of data-modification operations that were unsuccessful
func (cs cursorStats) WritesIgnored() int64 {
return cs.WritesIgnoredInt
}
// ScannedFull The total number of documents iterated over when scanning a collection without an index.
func (cs cursorStats) ScannedFull() int64 {
return cs.ScannedFullInt
}
// ScannedIndex The total number of documents iterated over when scanning a collection using an index.
func (cs cursorStats) ScannedIndex() int64 {
return cs.ScannedIndexInt
}
// Filtered the total number of documents that were removed after executing a filter condition in a FilterNode
func (cs cursorStats) Filtered() int64 {
return cs.FilteredInt
}
// FullCount Returns the number of results before the last LIMIT in the query was applied.
// A valid return value is only available when the has been created with a context that was
// prepared with `WithFullCount`. Additionally, this will also not return a valid value if
// the context was prepared with `WithStream`.
func (cs cursorStats) FullCount() int64 {
return cs.FullCountInt
}
// ExecutionTime query execution time (wall-clock time). value will be set from the outside
func (cs cursorStats) ExecutionTime() time.Duration {
return time.Duration(cs.ExecutionTimeInt * float64(time.Second))
}