Skip to content

Commit

Permalink
GODRIVER-3421 Remove the BSON document size validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Dec 4, 2024
1 parent acca80b commit fc91428
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 158 deletions.
2 changes: 1 addition & 1 deletion internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestCollection(t *testing.T) {
mt.Run("large document batches", func(mt *mtest.T) {
mt.Parallel()

docs := []interface{}{create16MBDocument(mt), create16MBDocument(mt)}
docs := []interface{}{create16MBDocument(mt), create16MBDocument(mt), create16MBDocument(mt)}
_, err := mt.Coll.InsertMany(context.Background(), docs)
assert.Nil(mt, err, "InsertMany error: %v", err)
evt := mt.GetStartedEvent()
Expand Down
26 changes: 0 additions & 26 deletions internal/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,32 +745,6 @@ func TestClientBulkWrite(t *testing.T) {
assert.Equal(mt, 1, killCursorsCalled, "expected %d killCursors call, got: %d", 1, killCursorsCalled)
})

mt.Run("bulkWrite returns error for unacknowledged too-large insert", func(mt *mtest.T) {
mt.ResetClient(options.Client())
var hello struct {
MaxBsonObjectSize int
}
err := mt.DB.RunCommand(context.Background(), bson.D{{"hello", 1}}).Decode(&hello)
require.NoError(mt, err, "Hello error: %v", err)
mt.Run("insert", func(mt *mtest.T) {
models := (&mongo.ClientWriteModels{}).
AppendInsertOne("db", "coll", &mongo.ClientInsertOneModel{
Document: bson.D{{"a", strings.Repeat("b", hello.MaxBsonObjectSize)}},
})
_, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetOrdered(false).SetWriteConcern(writeconcern.Unacknowledged()))
require.EqualError(mt, err, driver.ErrDocumentTooLarge.Error())
})
mt.Run("replace", func(mt *mtest.T) {
models := (&mongo.ClientWriteModels{}).
AppendReplaceOne("db", "coll", &mongo.ClientReplaceOneModel{
Filter: bson.D{},
Replacement: bson.D{{"a", strings.Repeat("b", hello.MaxBsonObjectSize)}},
})
_, err := mt.Client.BulkWrite(context.Background(), models, options.ClientBulkWrite().SetOrdered(false).SetWriteConcern(writeconcern.Unacknowledged()))
require.EqualError(mt, err, driver.ErrDocumentTooLarge.Error())
})
})

mt.Run("bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size", func(mt *mtest.T) {
type cmd struct {
Ops []bson.D
Expand Down
34 changes: 7 additions & 27 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type modelBatches struct {
writeErrors map[int]WriteError
}

var _ driver.OperationBatches = &modelBatches{}

func (mb *modelBatches) IsOrdered() *bool {
return &mb.ordered
}
Expand All @@ -209,7 +211,7 @@ func (mb *modelBatches) Size() int {
return len(mb.models) - mb.offset
}

func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) {
fn := functionSet{
appendStart: func(dst []byte, identifier string) (int32, []byte) {
var idx int32
Expand All @@ -228,10 +230,10 @@ func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, to
return dst
},
}
return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize)
return mb.appendBatches(fn, dst, maxCount, totalSize)
}

func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) {
fn := functionSet{
appendStart: bsoncore.AppendArrayElementStart,
appendDocument: bsoncore.AppendDocumentElement,
Expand All @@ -240,7 +242,7 @@ func (mb *modelBatches) AppendBatchArray(dst []byte, maxCount, maxDocSize, total
return dst
},
}
return mb.appendBatches(fn, dst, maxCount, maxDocSize, totalSize)
return mb.appendBatches(fn, dst, maxCount, totalSize)
}

type functionSet struct {
Expand All @@ -249,7 +251,7 @@ type functionSet struct {
updateLength func([]byte, int32, int32) []byte
}

func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) {
func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, totalSize int) (int, []byte, error) {
if mb.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -269,8 +271,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
}

canRetry := true
checkSize := true

l := len(dst)

opsIdx, dst := fn.appendStart(dst, "ops")
Expand All @@ -291,13 +291,11 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
var err error
switch model := mb.models[i].model.(type) {
case *ClientInsertOneModel:
checkSize = false
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult)
var id interface{}
id, doc, err = (&clientInsertDoc{
namespace: nsIdx,
document: model.Document,
sizeLimit: maxDocSize,
}).marshal(mb.client.bsonOpts, mb.client.registry)
if err != nil {
break
Expand Down Expand Up @@ -331,7 +329,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
checkDollarKey: true,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientReplaceOneModel:
checkSize = false
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
doc, err = (&clientUpdateDoc{
namespace: nsIdx,
Expand All @@ -343,7 +340,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
upsert: model.Upsert,
multi: false,
checkDollarKey: false,
sizeLimit: maxDocSize,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientDeleteOneModel:
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
Expand Down Expand Up @@ -371,9 +367,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
return 0, nil, err
}
length := len(doc)
if maxDocSize > 0 && length > maxDocSize+16*1024 {
return 0, nil, driver.ErrDocumentTooLarge
}
if !exists {
length += len(ns)
}
Expand All @@ -398,9 +391,6 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
dst = fn.updateLength(dst, opsIdx, int32(len(dst[opsIdx:])))
nsDst = fn.updateLength(nsDst, nsIdx, int32(len(nsDst[nsIdx:])))
dst = append(dst, nsDst...)
if checkSize && maxDocSize > 0 && len(dst)-l > maxDocSize+16*1024 {
return 0, nil, driver.ErrDocumentTooLarge
}

mb.retryMode = driver.RetryNone
if mb.client.retryWrites && canRetry {
Expand Down Expand Up @@ -584,8 +574,6 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
type clientInsertDoc struct {
namespace int
document interface{}

sizeLimit int
}

func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (interface{}, bsoncore.Document, error) {
Expand All @@ -596,9 +584,6 @@ func (d *clientInsertDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.
if err != nil {
return nil, nil, err
}
if d.sizeLimit > 0 && len(f) > d.sizeLimit {
return nil, nil, driver.ErrDocumentTooLarge
}
var id interface{}
f, id, err = ensureID(f, bson.NilObjectID, bsonOpts, registry)
if err != nil {
Expand All @@ -619,8 +604,6 @@ type clientUpdateDoc struct {
upsert *bool
multi bool
checkDollarKey bool

sizeLimit int
}

func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.Registry) (bsoncore.Document, error) {
Expand All @@ -641,9 +624,6 @@ func (d *clientUpdateDoc) marshal(bsonOpts *options.BSONOptions, registry *bson.
if err != nil {
return nil, err
}
if d.sizeLimit > 0 && len(u.Data) > d.sizeLimit {
return nil, driver.ErrDocumentTooLarge
}
doc = bsoncore.AppendValueElement(doc, "updateMods", u)
doc = bsoncore.AppendBooleanElement(doc, "multi", d.multi)

Expand Down
80 changes: 64 additions & 16 deletions mongo/client_bulk_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@ import (
func TestBatches(t *testing.T) {
t.Parallel()

t.Run("test Addvancing", func(t *testing.T) {
t.Parallel()
t.Parallel()

batches := &modelBatches{
models: make([]clientWriteModel, 2),
}
batches.AdvanceBatches(3)
size := batches.Size()
assert.Equal(t, 0, size, "expected: %d, got: %d", 1, size)
})
t.Run("test appendBatches", func(t *testing.T) {
t.Parallel()
batches := &modelBatches{
models: make([]clientWriteModel, 2),
}
batches.AdvanceBatches(3)
size := batches.Size()
assert.Equal(t, 0, size, "expected: %d, got: %d", 1, size)
}

func TestAppendBatchSequence(t *testing.T) {
t.Parallel()

newBatches := func(t *testing.T) *modelBatches {
client, err := newClient()
require.NoError(t, err, "NewClient error: %v", err)
batches := &modelBatches{
return &modelBatches{
client: client,
models: []clientWriteModel{
{"ns0", nil},
Expand All @@ -52,12 +53,15 @@ func TestBatches(t *testing.T) {
Acknowledged: true,
},
}
var n int
}
t.Run("test appendBatches", func(t *testing.T) {
t.Parallel()

batches := newBatches(t)
const limitBigEnough = 16_000
// test the "maxCount" that truncates the output
n, _, err = batches.AppendBatchSequence(nil, 4, limitBigEnough, limitBigEnough)
n, _, err := batches.AppendBatchSequence(nil, 4, limitBigEnough)
require.NoError(t, err, "AppendBatchSequence error: %v", err)
assert.Equal(t, 3, n, "expected %d appendings, got: %d", 3, n)
require.Equal(t, 3, n, "expected %d appendings, got: %d", 3, n)

_ = batches.cursorHandlers[0](&cursorInfo{Ok: true, Idx: 0}, nil)
_ = batches.cursorHandlers[1](&cursorInfo{Ok: true, Idx: 1}, nil)
Expand All @@ -71,6 +75,50 @@ func TestBatches(t *testing.T) {
assert.True(t, ok, "expected an insert results")

_, ok = batches.result.DeleteResults[3]
assert.True(t, ok, "expected an delete results")
})
t.Run("test appendBatches with maxCount", func(t *testing.T) {
t.Parallel()

batches := newBatches(t)
const limitBigEnough = 16_000
n, _, err := batches.AppendBatchSequence(nil, 2, limitBigEnough)
require.NoError(t, err, "AppendBatchSequence error: %v", err)
require.Equal(t, 2, n, "expected %d appendings, got: %d", 2, n)

_ = batches.cursorHandlers[0](&cursorInfo{Ok: true, Idx: 0}, nil)
_ = batches.cursorHandlers[1](&cursorInfo{Ok: true, Idx: 1}, nil)

ins, ok := batches.result.InsertResults[1]
assert.True(t, ok, "expected an insert results")
assert.NotNil(t, ins.InsertedID, "expected an ID")

_, ok = batches.result.UpdateResults[2]
assert.True(t, ok, "expected an insert results")

_, ok = batches.result.DeleteResults[3]
assert.False(t, ok, "expected an delete results")
})
t.Run("test appendBatches with totalSize", func(t *testing.T) {
t.Parallel()

batches := newBatches(t)
const limit = 1200 // > ( 166 first two batches + 1000 overhead )
n, _, err := batches.AppendBatchSequence(nil, 4, limit)
require.NoError(t, err, "AppendBatchSequence error: %v", err)
require.Equal(t, 2, n, "expected %d appendings, got: %d", 2, n)

_ = batches.cursorHandlers[0](&cursorInfo{Ok: true, Idx: 0}, nil)
_ = batches.cursorHandlers[1](&cursorInfo{Ok: true, Idx: 1}, nil)

ins, ok := batches.result.InsertResults[1]
assert.True(t, ok, "expected an insert results")
assert.NotNil(t, ins.InsertedID, "expected an ID")

_, ok = batches.result.UpdateResults[2]
assert.True(t, ok, "expected an insert results")

_, ok = batches.result.DeleteResults[3]
assert.False(t, ok, "expected an delete results")
})
}
27 changes: 27 additions & 0 deletions mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"math"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -516,4 +517,30 @@ func TestClient(t *testing.T) {
errmsg := `invalid value "-1s" for "Timeout": value must be positive`
assert.Equal(t, errmsg, err.Error(), "expected error %v, got %v", errmsg, err.Error())
})
t.Run("bulkWrite with large messages", func(t *testing.T) {
var bulkWrites int
cmdMonitor := &event.CommandMonitor{
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
if evt.CommandName == "bulkWrite" {
bulkWrites++
}
},
}
cs := integtest.ConnString(t)
clientOpts := options.Client().ApplyURI(cs.Original).SetMonitor(cmdMonitor)
client, err := Connect(clientOpts)
assert.Nil(t, err, "Connect error: %v", err)
defer func() {
_ = client.Disconnect(bgCtx)
}()
document := bson.D{{"largeField", strings.Repeat("a", 16777216-100)}} // Adjust size to account for BSON overhead
models := &ClientWriteModels{}
models = models.AppendInsertOne("db", "x", NewClientInsertOneModel().SetDocument(document))
models = models.AppendInsertOne("db", "x", NewClientInsertOneModel().SetDocument(document))
models = models.AppendInsertOne("db", "x", NewClientInsertOneModel().SetDocument(document))

_, err = client.BulkWrite(context.Background(), models)
require.NoError(t, err)
assert.Equal(t, 2, bulkWrites, "expected %d bulkWrites, got %d", 2, bulkWrites)
})
}
16 changes: 6 additions & 10 deletions x/mongo/driver/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ type Batches struct {
offset int
}

var _ OperationBatches = &Batches{}

// AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max
// document size, or total size allows. It returns the number of batches appended, the new appended slice, and
// any error raised. It returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, totalSize int) (int, []byte, error) {
if b.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -44,11 +46,8 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
break
}
size += len(doc)
if size > maxDocSize {
if size > totalSize {
break
}
dst = append(dst, doc...)
Expand All @@ -64,7 +63,7 @@ func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (
// AppendBatchArray appends dst with array of batches as long as the limits of max count, max document size, or
// total size allows. It returns the number of batches appended, the new appended slice, and any error raised. It
// returns the origenal input slice if nothing can be appends within the limits.
func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) {
func (b *Batches) AppendBatchArray(dst []byte, maxCount, totalSize int) (int, []byte, error) {
if b.Size() == 0 {
return 0, dst, io.EOF
}
Expand All @@ -77,11 +76,8 @@ func (b *Batches) AppendBatchArray(dst []byte, maxCount, maxDocSize, _ int) (int
break
}
doc := b.Documents[i]
if len(doc) > maxDocSize {
break
}
size += len(doc)
if size > maxDocSize {
if size > totalSize {
break
}
dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(n), doc)
Expand Down
Loading

0 comments on commit fc91428

Please sign in to comment.