Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
implement int field support
Browse files Browse the repository at this point in the history
add local cache for node URIs per index/shard
add exported (questionable) method to client for doing simple integer
imports.
add translated column keys to the local cache.
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 62bc87f commit edf6cb8
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 63 deletions.
55 changes: 54 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,28 @@ type Client struct {
importLogEncoder encoder
logLock sync.Mutex

// TODO make this threadsafe using key translation cache on client using embedded K/V store.
translator *Translator
// TODO threadsafe key translation cache on client using embedded K/V store.

// TODO shardNodes needs to be invalidated/updated when cluster topology changes.
shardNodes shardNodes
}

func (c *Client) GetURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
return uris, nil
}
fragmentNodes, err := c.fetchFragmentNodes(index, shard)
if err != nil {
return nil, errors.Wrap(err, "trying to look up nodes for shard")
}
uris = make([]*URI, 0, len(fragmentNodes))
for _, fn := range fragmentNodes {
uris = append(uris, fn.URI())
}
c.shardNodes.Put(index, shard, uris)
return uris, nil
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -143,6 +163,7 @@ func newClientWithOptions(options *ClientOptions) *Client {
coordinatorLock: &sync.RWMutex{},

translator: NewTranslator(),
shardNodes: newShardNodes(),
}
if options.importLogWriter != nil {
c.importLogEncoder = newImportLogEncoder(options.importLogWriter)
Expand Down Expand Up @@ -646,6 +667,38 @@ func (c *Client) importValues(field *Field,
return errors.Wrap(err, "importing values to nodes")
}

// ImportValues takes the given integer values and column ids
// (which must all be in the given shard) and imports them into the
// given index,field,shard on all nodes which should hold that shard.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64) error {
msg := &pbuf.ImportValueRequest{
Index: index,
Field: field,
Shard: shard,
ColumnIDs: ids,
Values: vals,
}
data, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "marshaling to protobuf")
}
path := fmt.Sprintf("/index/%s/field/%s/import", index, field)
c.logImport(index, path, shard, false, data)

uris, err := c.GetURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}

eg := errgroup.Group{}
for _, uri := range uris {
eg.Go(func() error {
return c.importData(uri, path, data)
})
}
return errors.Wrap(eg.Wait(), "importing values to nodes")
}

func importPathData(field *Field, shard uint64, msg proto.Message, options *ImportOptions) (path string, data []byte, err error) {
data, err = proto.Marshal(msg)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,32 @@ func TestImportWithBatchSize(t *testing.T) {
}
}

func TestImportValues(t *testing.T) {
client := getClient()
schema, err := client.Schema()
if err != nil {
t.Fatalf("getting schema: %v", err)
}
index := schema.Index("go-testindex")
intfield := index.Field("intfield", OptFieldTypeInt())
err = client.SyncSchema(schema)
if err != nil {
t.Fatalf("syncing schema: %v", err)
}

err = client.ImportValues("go-testindex", "intfield", 0, []int64{1, 2, 3}, []uint64{1, 2, 3})
if err != nil {
t.Fatalf("importing values: %v", err)
}

resp, err := client.Query(intfield.GT(0))
result := resp.Result()
if !reflect.DeepEqual(result.Row().Columns, []uint64{1, 2, 3}) {
t.Fatalf("unexpected result: %v", result.Row().Columns)
}

}

// Ensure that the client does not send batches of zero records to Pilosa.
// In our case it should send:
// batch 1: shard[0,1]
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.4.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/ugorji/go v1.1.5-pre // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.opencensus.io v0.22.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/DataDog/datadog-go v2.2.0+incompatible h1:V5BKkxACZLjzHjSgBbr2gvLA2Ae
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/StackExchange/wmi v0.0.0-20181212234831-e0a55b97c705/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down Expand Up @@ -54,6 +55,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -170,6 +172,8 @@ github.com/pilosa/pilosa v0.0.0-20190104143002-8c4b1548bc4b h1:2H/+JUxL4dv0uJ4G4
github.com/pilosa/pilosa v0.0.0-20190104143002-8c4b1548bc4b/go.mod h1:NgpkJkefqUKUHV7O3TqBOu89tsao3ksth2wzTNe8CPQ=
github.com/pilosa/pilosa v1.2.1-0.20190410162749-b973f8c96356 h1:jDxhpV4l+CpKqVVgld73e9/EyogdCcO1ftbCvifrhSc=
github.com/pilosa/pilosa v1.2.1-0.20190410162749-b973f8c96356/go.mod h1:QN7EwQwoQHNPVsd7CHXFDasPznLDA6DPswmnLr4eJ6o=
github.com/pilosa/pilosa v1.2.1-0.20190807173852-bc9747cc0f19 h1:93vMMs0jAhynsJpbC3AMynz1M9g5G5vnVVPjM1cpU94=
github.com/pilosa/pilosa v1.2.1-0.20190807173852-bc9747cc0f19/go.mod h1:57zHA92sPbJ01QsMyyEDASX2TJnf8qSM7ZdUnVzM0b8=
github.com/pilosa/pilosa v1.3.1 h1:rLDVqJBuRzhPtue730D+EX0YEVS4R0oDzsE4bJBwLcE=
github.com/pilosa/pilosa v1.3.1/go.mod h1:97yLL9mpUqOj9naKu5XA/b/U6JLe3JGGUlc2HOTDw+A=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -209,6 +213,7 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
Expand Down Expand Up @@ -238,6 +243,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQGFt7E53bPYqEgug/AoBtY=
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
Expand All @@ -261,6 +267,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56 h1:ZpKuNIejY8P0ExLOVyKhb0WsgG8UdvHXe6TWjY7eL6k=
Expand Down Expand Up @@ -289,6 +296,7 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190424112056-4829fb13d2c6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
Expand Down Expand Up @@ -317,6 +325,7 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading

0 comments on commit edf6cb8

Please sign in to comment.