Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
The client works with both legacy and Pilosa 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Feb 15, 2018
2 parents 097dafa + d7d9e0b commit a8b89de
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 71 deletions.
237 changes: 202 additions & 35 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"time"

"github.com/blang/semver"

"github.com/golang/protobuf/proto"
pbuf "github.com/pilosa/go-pilosa/gopilosa_pbuf"
"github.com/pkg/errors"
Expand All @@ -64,7 +63,8 @@ type Client struct {
versionChecked bool
// User-Agent header cache. Not used until cluster-resize support branch is merged
// and user agent is saved here in NewClient
userAgent string
userAgent string
legacyMode bool
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -104,6 +104,7 @@ func NewClientWithCluster(cluster *Cluster, options *ClientOptions) *Client {
cluster: cluster,
client: newHTTPClient(options.withDefaults()),
versionChecked: options.SkipVersionCheck,
legacyMode: options.LegacyMode,
}
}

Expand Down Expand Up @@ -314,12 +315,18 @@ func (c *Client) syncSchema(schema *Schema, serverSchema *Schema) error {

// Schema returns the indexes and frames on the server.
func (c *Client) Schema() (*Schema, error) {
schemaInfo, err := c.readSchema()
var indexes []StatusIndex
var err error
if c.legacyMode {
indexes, err = c.readSchemaLegacy()
} else {
indexes, err = c.readSchema()
}
if err != nil {
return nil, err
}
schema := NewSchema()
for _, indexInfo := range schemaInfo.Indexes {
for _, indexInfo := range indexes {
index, err := schema.Index(indexInfo.Name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -581,14 +588,31 @@ func (c *Client) fetchFragmentNodes(indexName string, slice uint64) ([]fragmentN
if err != nil {
return nil, err
}
var fragmentNodeURIs []fragmentNodeRoot
err = json.Unmarshal(body, &fragmentNodeURIs)
if err != nil {
return nil, err
}
fragmentNodes := []fragmentNode{}
for _, nodeURI := range fragmentNodeURIs {
fragmentNodes = append(fragmentNodes, nodeURI.URI)
if c.legacyMode {
err = json.Unmarshal(body, &fragmentNodes)
if err != nil {
return nil, errors.Wrap(err, "unmarshaling fragment nodes")
}
// normalize legacy host
for i, node := range fragmentNodes {
uri, err := NewURIFromAddress(node.Host)
if err != nil {
return nil, errors.Wrap(err, "creating a URI")
}
node.Host = uri.Host()
node.Port = uri.Port()
fragmentNodes[i] = node
}
} else {
var fragmentNodeURIs []fragmentNodeRoot
err = json.Unmarshal(body, &fragmentNodeURIs)
if err != nil {
return nil, errors.Wrap(err, "unmarshaling fragment node URIs")
}
for _, nodeURI := range fragmentNodeURIs {
fragmentNodes = append(fragmentNodes, nodeURI.URI)
}
}
return fragmentNodes, nil
}
Expand Down Expand Up @@ -629,13 +653,26 @@ func (c *Client) importValueNodeK(uri *URI, request *pbuf.ImportValueRequest) er

// ExportFrame exports bits for a frame.
func (c *Client) ExportFrame(frame *Frame, view string) (BitIterator, error) {
var slicesMax map[string]uint64
var err error

status, err := c.status()
if err != nil {
return nil, err
}
slicesMax, err := c.slicesMax()
if err != nil {
return nil, err
if c.legacyMode {
slicesMax = map[string]uint64{}
if len(status.Nodes) > 0 {
node := status.Nodes[0]
for _, index := range node.Indexes {
slicesMax[index.Name] = index.Slices[len(index.Slices)-1]
}
}
} else {
slicesMax, err = c.slicesMax()
if err != nil {
return nil, err
}
}
status.indexMaxSlice = slicesMax
sliceURIs, err := c.statusToNodeSlicesForIndex(status, frame.index.Name())
Expand Down Expand Up @@ -665,25 +702,48 @@ func (c *Client) status() (Status, error) {
if err != nil {
return Status{}, errors.Wrap(err, "requesting /status")
}
status := Status{}
err = json.Unmarshal(data, &status)
if err != nil {
return Status{}, errors.Wrap(err, "unmarshaling /status data")
if c.legacyMode {
status, err := decodeLegacyStatus(data)
if err != nil {
return Status{}, errors.Wrap(err, "unmarshaling /status data")
}
return status, nil
} else {
status := Status{}
err = json.Unmarshal(data, &status)
if err != nil {
return Status{}, errors.Wrap(err, "unmarshaling /status data")
}
return status, nil
}
return status, nil
}

func (c *Client) readSchema() (SchemaInfo, error) {
func (c *Client) readSchema() ([]StatusIndex, error) {
_, data, err := c.httpRequest("GET", "/schema", nil, nil)
if err != nil {
return SchemaInfo{}, errors.Wrap(err, "requesting /schema")
return nil, errors.Wrap(err, "requesting /schema")
}
schemaInfo := SchemaInfo{}
err = json.Unmarshal(data, &schemaInfo)
if err != nil {
return SchemaInfo{}, errors.Wrap(err, "unmarshaling /schema data")
return nil, errors.Wrap(err, "unmarshaling /schema data")
}
return schemaInfo.Indexes, nil
}

func (c *Client) readSchemaLegacy() ([]StatusIndex, error) {
_, data, err := c.httpRequest("GET", "/status", nil, nil)
if err != nil {
return nil, errors.Wrap(err, "requesting /status")
}
status, err := decodeLegacyStatus(data)
if err != nil {
return nil, errors.Wrap(err, "unmarshaling /status data")
}
return schemaInfo, nil
if len(status.Nodes) > 0 {
return status.Nodes[0].Indexes, nil
}
return nil, nil
}

func (c *Client) slicesMax() (map[string]uint64, error) {
Expand Down Expand Up @@ -770,9 +830,16 @@ func (c *Client) doRequest(host *URI, method, path string, headers map[string]st
if !c.versionChecked {
// check the server version on the first request
c.versionChecked = true
// don't care about fetching the version, it's not vital
ver, _ := c.fetchServerVersion()
checkServerVersion(ver)
ver, err := c.ServerVersion()
if err != nil {
log.Println("Pilosa server version is not available:", err)
} else {
err = checkServerVersion(ver)
if err != nil {
c.legacyMode = true
log.Println(err)
}
}
}
req, err := makeRequest(host, method, path, headers, reader)
if err != nil {
Expand Down Expand Up @@ -821,9 +888,22 @@ func (c *Client) fetchServerVersion() (string, error) {
return versionInfo.Version, nil
}

func checkServerVersion(version string) {
func (c *Client) ServerVersion() (string, error) {
_, data, err := c.httpRequest("GET", "/version", nil, nil)
if err != nil {
return "", errors.Wrap(err, "requesting /version")
}
versionInfo := versionInfo{}
err = json.Unmarshal(data, &versionInfo)
if err != nil {
return "", errors.Wrap(err, "unmarshaling /version data")
}
return versionInfo.Version, nil
}

func checkServerVersion(version string) error {
if version == "" {
log.Println("Pilosa server version is not available")
return errors.New("Pilosa server version is not available.")
}
if strings.HasPrefix(version, "v") {
version = version[1:]
Expand All @@ -832,11 +912,12 @@ func checkServerVersion(version string) {
minVersion, err2 := semver.ParseRange(pilosaMinVersion)
// check err of serverVersion and minVersion together, otherwise it's not possible to write a test for coverage
if err1 != nil || err2 != nil {
log.Printf("Invalid Pilosa server version: %s or minimum server version: %s", version, pilosaMinVersion)
return fmt.Errorf("Invalid Pilosa server version: %s or minimum server version: %s.", version, pilosaMinVersion)
}
if !minVersion(serverVersion) {
log.Printf("Pilosa server's version is %s, does not meet the minimum required for this version of the client: %s", version, pilosaMinVersion)
return fmt.Errorf("Pilosa server's version is %s, does not meet the minimum required for this version of the client: %s.", version, pilosaMinVersion)
}
return nil
}

func (c *Client) augmentHeaders(headers map[string]string) map[string]string {
Expand Down Expand Up @@ -1038,9 +1119,9 @@ func TLSConfig(config *tls.Config) ClientOption {
}
}

func SkipVersionCheck(skip bool) ClientOption {
func SkipVersionCheck() ClientOption {
return func(options *ClientOptions) error {
options.SkipVersionCheck = skip
options.SkipVersionCheck = true
return nil
}
}
Expand Down Expand Up @@ -1169,9 +1250,10 @@ type Status struct {

// StatusNode contains node information.
type StatusNode struct {
Scheme string `json:"scheme"`
Host string `json:"host"`
Port int `json:"port"`
Scheme string `json:"scheme"`
Host string `json:"host"`
Port int `json:"port"`
Indexes []StatusIndex `json:"indexes"`
}

type SchemaInfo struct {
Expand All @@ -1183,6 +1265,7 @@ type StatusIndex struct {
Name string `json:"name"`
Options StatusOptions `json:"options"`
Frames []StatusFrame `json:"frames"`
Slices []uint64 `json:"slices"`
}

// StatusFrame contains frame information.
Expand Down Expand Up @@ -1274,3 +1357,87 @@ type ClientDiagnosticsInfo struct {
Runtime string `json:"runtime,omitempty"`
Platform string `json:"platform,omitempty"`
}

type statusRoot struct {
Status *StatusLegacy `json:"status"`
}

// Status contains the status information from a Pilosa server.
type StatusLegacy struct {
Nodes []StatusNodeLegacy `json:"Nodes"`
indexMaxSlice map[string]uint64
}

// StatusNode contains node information.
type StatusNodeLegacy struct {
Scheme string `json:"Scheme"`
Host string `json:"Host"`
Indexes []StatusIndexLegacy `json:"Indexes"`
}

// StatusIndex contains index information.
type StatusIndexLegacy struct {
Name string `json:"Name"`
Frames []StatusFrameLegacy `json:"Frames"`
Slices []uint64 `json:"Slices"`
}

// StatusFrame contains frame information.
type StatusFrameLegacy struct {
Name string `json:"Name"`
Options StatusOptionsLegacy `json:"Meta"`
}

// StatusOptions contains options for a frame or an index.
type StatusOptionsLegacy struct {
ColumnLabel string `json:"ColumnLabel"`
RowLabel string `json:"RowLabel"`
CacheType string `json:"CacheType"`
CacheSize uint `json:"CacheSize"`
InverseEnabled bool `json:"InverseEnabled"`
RangeEnabled bool `json:"RangeEnabled"`
Fields []StatusField `json:"Fields"`
TimeQuantum string `json:"TimeQuantum"`
}

func decodeLegacyStatus(data []byte) (Status, error) {
statusRoot := &statusRoot{}
err := json.Unmarshal(data, statusRoot)
if err != nil {
return Status{}, err
}
nodes := []StatusNode{}
for _, legacyNode := range statusRoot.Status.Nodes {
resultIndexes := []StatusIndex{}
for _, legacyIndex := range legacyNode.Indexes {
frames := []StatusFrame{}
for _, legacyFrame := range legacyIndex.Frames {
frames = append(frames, StatusFrame{
Name: legacyFrame.Name,
Options: StatusOptions(legacyFrame.Options),
})
}
index := StatusIndex{
Name: legacyIndex.Name,
Frames: frames,
Slices: legacyIndex.Slices,
}
resultIndexes = append(resultIndexes, index)
}
uri, err := NewURIFromAddress(legacyNode.Host)
if err != nil {
return Status{}, errors.Wrap(err, "creating a URI")
}
nodes = append(nodes, StatusNode{
Scheme: legacyNode.Scheme,
Host: uri.Host(),
Port: int(uri.Port()),
Indexes: resultIndexes,
})
}
result := Status{
Nodes: nodes,
indexMaxSlice: statusRoot.Status.indexMaxSlice,
}
return result, nil
}
Loading

0 comments on commit a8b89de

Please sign in to comment.