Skip to content

Commit

Permalink
feat(db): allow model upsertion
Browse files Browse the repository at this point in the history
closes #292
  • Loading branch information
frrist committed Dec 16, 2020
1 parent 6dfa518 commit b7a48ba
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 20 deletions.
2 changes: 1 addition & 1 deletion commands/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var Migrate = &cli.Command{

ctx := cctx.Context

db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"))
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), false)
if err != nil {
return xerrors.Errorf("connect database: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func SetupStorageAndAPI(cctx *cli.Context) (context.Context, *RunContext, error)

func setupDatabase(cctx *cli.Context) (*storage.Database, error) {
ctx := cctx.Context
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"))
db, err := storage.NewDatabase(ctx, cctx.String("db"), cctx.Int("db-pool-size"), cctx.String("name"), cctx.Bool("db-allow-upsert"))
if err != nil {
return nil, xerrors.Errorf("new database: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/go-pg/migrations/v8 v8.0.1
github.com/go-pg/pg/v10 v10.3.1
github.com/go-pg/pgext v0.1.4
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.7
Expand All @@ -29,7 +28,6 @@ require (
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/jackc/pgx/v4 v4.9.0
github.com/jinzhu/inflection v1.0.0
github.com/lib/pq v1.8.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/mitchellh/go-homedir v1.1.0
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
Expand Down Expand Up @@ -356,8 +357,6 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4 h1:Q7s2AN3DhFJKOnzO0uTKLhJTfXTEcXcvw5ylf2BHJw4=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
Expand Down Expand Up @@ -716,6 +715,7 @@ github.com/ipld/go-ipld-prime-proto v0.1.0 h1:j7gjqrfwbT4+gXpHwEx5iMssma3mnctC7Y
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
Expand All @@ -734,6 +734,7 @@ github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye47
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
Expand Down Expand Up @@ -1432,6 +1433,7 @@ github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down Expand Up @@ -1541,6 +1543,7 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func main() {
EnvVars: []string{"LOTUS_DB_POOL_SIZE"},
Value: 75,
},
&cli.BoolFlag{
Name: "db-allow-upsert",
EnvVars: []string{"LOTUS_DB_ALLOW_UPSERT"},
Value: false,
},
&cli.IntFlag{
Name: "lens-cache-hint",
EnvVars: []string{"VISOR_LENS_CACHE_HINT"},
Expand Down
98 changes: 86 additions & 12 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -89,7 +90,7 @@ var (

const MaxPostgresNameLength = 64

func NewDatabase(ctx context.Context, url string, poolSize int, name string) (*Database, error) {
func NewDatabase(ctx context.Context, url string, poolSize int, name string, upsert bool) (*Database, error) {
if len(name) > MaxPostgresNameLength {
return nil, ErrNameTooLong
}
Expand All @@ -104,15 +105,17 @@ func NewDatabase(ctx context.Context, url string, poolSize int, name string) (*D
}

return &Database{
opt: opt,
Clock: clock.New(),
opt: opt,
Clock: clock.New(),
Upsert: upsert,
}, nil
}

type Database struct {
DB *pg.DB
opt *pg.Options
Clock clock.Clock
DB *pg.DB
opt *pg.Options
Clock clock.Clock
Upsert bool
}

// Connect opens a connection to the database and checks that the schema is compatible the the version required
Expand Down Expand Up @@ -715,7 +718,8 @@ func (d *Database) RunInTransaction(ctx context.Context, fn func(tx *pg.Tx) erro
func (d *Database) PersistBatch(ctx context.Context, ps ...model.Persistable) error {
return d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
txs := &TxStorage{
tx: tx,
tx: tx,
upsert: d.Upsert,
}

for _, p := range ps {
Expand All @@ -729,7 +733,8 @@ func (d *Database) PersistBatch(ctx context.Context, ps ...model.Persistable) er
}

type TxStorage struct {
tx *pg.Tx
tx *pg.Tx
upsert bool
}

// PersistModel persists a single model
Expand All @@ -755,10 +760,79 @@ func (s *TxStorage) PersistModel(ctx context.Context, m interface{}) error {
}

}
if _, err := s.tx.ModelContext(ctx, m).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting model: %w", err)
if s.upsert {
conflict, upsert := GenerateUpsertStrings(m)
if _, err := s.tx.ModelContext(ctx, m).
OnConflict(conflict).
Set(upsert).
Insert(); err != nil {
return xerrors.Errorf("upserting model: %w", err)
}
} else {
if _, err := s.tx.ModelContext(ctx, m).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting model: %w", err)
}
}
return nil
}

// GenerateUpsertString accepts a visor model and returns two string containing SQL that may be used
// to upsert the model. The first string is the conflict statement and the second is the insert.
//
// Example given the below model:
//
// type SomeModel struct {
// Height int64 `pg:",pk,notnull,use_zero"`
// MinerID string `pg:",pk,notnull"`
// StateRoot string `pg:",pk,notnull"`
// OwnerID string `pg:",notnull"`
// WorkerID string `pg:",notnull"`
// }
//
// The strings returned are:
// conflict string:
// "(cid, height, state_root) DO UPDATE"
// update string:
// "owner_id" = EXCLUDED.owner_id, "worker_id" = EXCLUDED.worker_id
func GenerateUpsertStrings(model interface{}) (string, string) {
var cf []string
var ucf []string

// gather all public keys
for _, pk := range pg.Model(model).TableModel().Table().PKs {
cf = append(cf, pk.SQLName)
}
// gather all other fields
for _, field := range pg.Model(model).TableModel().Table().DataFields {
ucf = append(ucf, field.SQLName)
}

// consistent ordering in sql statements.
sort.Strings(cf)
sort.Strings(ucf)

// build the conflict string
var conflict strings.Builder
conflict.WriteString("(")
for i, str := range cf {
conflict.WriteString(str)
// if this isn't the last field in the conflict statement add a comma.
if !(i == len(cf)-1) {
conflict.WriteString(", ")
}
}
conflict.WriteString(") DO UPDATE")

// build the upsert string
var upsert strings.Builder
for i, str := range ucf {
upsert.WriteString("\"" + str + "\"" + " = EXCLUDED." + str)
// if this isn't the last field in the upsert statement add a comma.
if !(i == len(ucf)-1) {
upsert.WriteString(", ")
}
}
return conflict.String(), upsert.String()
}
119 changes: 117 additions & 2 deletions storage/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"github.com/filecoin-project/sentinel-visor/model/actors/miner"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1116,12 +1117,126 @@ func TestMarkTipSetComplete(t *testing.T) {
})
}

func TestModelUpsert(t *testing.T) {
if testing.Short() {
t.Skip("short testing requested")
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

db, cleanup, err := testutil.WaitForExclusiveDatabase(ctx, t)
require.NoError(t, err)
defer func() { require.NoError(t, cleanup()) }()

_, err = db.Exec(`TRUNCATE TABLE miner_infos`)
require.NoError(t, err, "truncating miner_infos")

// database disallowing upserting
d := &Database{
DB: db,
Clock: testutil.NewMockClock(),
Upsert: false,
}

// model was picked for this test since it has nullable fields and untagged pg fields.
minerInfo := &miner.MinerInfo{
Height: 1,
MinerID: "minerID",
StateRoot: "stateroot",
OwnerID: "owner",
WorkerID: "worker",
WorkerChangeEpoch: 0,
ConsensusFaultedElapsed: 0,
PeerID: "",
ControlAddresses: nil,
MultiAddresses: nil,
}

// the second insert should be ignored.
err = d.PersistBatch(ctx, minerInfo)
require.NoErrorf(t, err, "persisting miner info model: %v", err)
err = d.PersistBatch(ctx, minerInfo)
require.NoErrorf(t, err, "persisting miner info model: %v", err)

var count int
_, err = db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM miner_infos`)
require.NoError(t, err)
assert.Equal(t, 1, count)

count = 0
// modify the database to permit upserting
d.Upsert = true

// modify the model, expect this change to persist after the upsert.
minerInfo.OwnerID = "UPSERT"
err = d.PersistBatch(ctx, minerInfo)
require.NoErrorf(t, err, "persisting miner_info model: %v", err)

// reset count, there should still be a single item in the table
_, err = db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM miner_infos`)
require.NoError(t, err)
assert.Equal(t, 1, count)

var owner string
_, err = db.QueryOne(pg.Scan(&owner), `SELECT owner_id FROM miner_infos`)
require.NoError(t, err)
assert.Equal(t, "UPSERT", owner)

}

func TestLongNames(t *testing.T) {
justLongEnough := strings.Repeat("x", MaxPostgresNameLength)
_, err := NewDatabase(context.Background(), "postgres://example.com/fakedb", 1, justLongEnough)
_, err := NewDatabase(context.Background(), "postgres://example.com/fakedb", 1, justLongEnough, false)
require.NoError(t, err)

tooLong := strings.Repeat("x", MaxPostgresNameLength+1)
_, err = NewDatabase(context.Background(), "postgres://example.com/fakedb", 1, tooLong)
_, err = NewDatabase(context.Background(), "postgres://example.com/fakedb", 1, tooLong, false)
require.Error(t, err)
}

// TestingUpsertStruct is only used for validating the GenerateUpsertStrings() method
type TestingUpsertStruct struct {
// should be ignored by upsert generator
tableName struct{} `pg:"testing_upsert_struct"` // nolint: structcheck,unused
Ignored string `pg:"-"`

// should be a constrained field in the conflict statement
Height int64 `pg:",pk,use_zero,notnull"`
Cid string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`

// should be an unconstrained field in the upsert statement
Heads string `pg:",notnull"`
Shoulders string `pg:",nopk"`
Knees uint64 `pg:",use_zero"`

// currently we drop the `pg` tag from fields we allow as null, this is probably a bad habit.
Toes []byte
CamelCase string
}

func (t *TestingUpsertStruct) ExpectedConflictStatement() string {
return "(cid, height, state_root) DO UPDATE"
}

func (t *TestingUpsertStruct) ExpectedUpsertStatement() string {
return `"camel_case" = EXCLUDED.camel_case, "heads" = EXCLUDED.heads, "knees" = EXCLUDED.knees, "shoulders" = EXCLUDED.shoulders, "toes" = EXCLUDED.toes`
}

func TestUpsertSQLGeneration(t *testing.T) {
testModel := &TestingUpsertStruct{
Ignored: "ignored",
Height: 1,
Cid: "cid",
StateRoot: "stateroot",
Heads: "heads",
Shoulders: "shoulders",
Knees: 1,
Toes: []byte{1, 2, 3},
}
conflict, upsert := GenerateUpsertStrings(testModel)

assert.Equal(t, testModel.ExpectedConflictStatement(), conflict)
assert.Equal(t, testModel.ExpectedUpsertStatement(), upsert)
}

0 comments on commit b7a48ba

Please sign in to comment.