Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #60 from pilosa/48-http-client
Browse files Browse the repository at this point in the history
error handling and single http client
  • Loading branch information
jaffee authored Sep 21, 2017
2 parents b6ad2fa + d02f7db commit 01223c2
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 97 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

159 changes: 90 additions & 69 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ package pilosa
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -46,6 +45,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pilosa/go-pilosa/internal"
"github.com/pkg/errors"
)

const maxHosts = 10
Expand All @@ -59,10 +59,8 @@ var protobufHeaders = map[string]string{

// Client is the HTTP client for Pilosa server.
type Client struct {
cluster *Cluster
host *URI
client *http.Client
cacheClients map[string]*Client
cluster *Cluster
client *http.Client
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -99,9 +97,8 @@ func NewClientWithCluster(cluster *Cluster, options *ClientOptions) *Client {
options = &ClientOptions{}
}
return &Client{
cluster: cluster,
client: newHTTPClient(options.withDefaults()),
cacheClients: make(map[string]*Client),
cluster: cluster,
client: newHTTPClient(options.withDefaults()),
}
}

Expand All @@ -114,7 +111,10 @@ func (c *Client) Query(query PQLQuery, options *QueryOptions) (*QueryResponse, e
if options == nil {
options = &QueryOptions{}
}
data := makeRequestData(query.serialize(), options)
data, err := makeRequestData(query.serialize(), options)
if err != nil {
return nil, errors.Wrap(err, "making request data")
}
path := fmt.Sprintf("/index/%s/query", query.Index().name)
_, buf, err := c.httpRequest("POST", path, data, protobufHeaders, rawResponse)
if err != nil {
Expand Down Expand Up @@ -355,35 +355,18 @@ func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator Valu
return nil
}

func (c *Client) getDirectClient(host string) (*Client, error) {
key := host
client, ok := c.cacheClients[key]
if ok {
return client, nil
}
uri, err := NewURIFromAddress(host)
if err != nil {
return nil, err
}
client = NewClientWithURI(uri)

c.cacheClients[key] = client
return client, nil
}

func (c *Client) importBits(indexName string, frameName string, slice uint64, bits []Bit) error {
sort.Sort(bitsForSort(bits))
nodes, err := c.fetchFragmentNodes(indexName, slice)
if err != nil {
return err
}
for _, node := range nodes {
client, err := c.getDirectClient(node.Host)
uri, err := NewURIFromAddress(node.Host)
if err != nil {
return err
}

err = client.importNode(bitsToImportRequest(indexName, frameName, slice, bits))
err = c.importNode(uri, bitsToImportRequest(indexName, frameName, slice, bits))
if err != nil {
return err
}
Expand All @@ -399,12 +382,11 @@ func (c *Client) importValues(indexName string, frameName string, slice uint64,
return err
}
for _, node := range nodes {
client, err := c.getDirectClient(node.Host)
uri, err := NewURIFromAddress(node.Host)
if err != nil {
return err
}

err = client.importValueNode(valsToImportRequest(indexName, frameName, slice, fieldName, vals))
err = c.importValueNode(uri, valsToImportRequest(indexName, frameName, slice, fieldName, vals))
if err != nil {
return err
}
Expand All @@ -427,23 +409,24 @@ func (c *Client) fetchFragmentNodes(indexName string, slice uint64) ([]fragmentN
return fragmentNodes, nil
}

func (c *Client) importNode(request *internal.ImportRequest) error {
data, _ := proto.Marshal(request)
// request.Marshal never returns an error
_, _, err := c.httpRequest("POST", "/import", data, protobufHeaders, noResponse)
func (c *Client) importNode(uri *URI, request *internal.ImportRequest) error {
data, err := proto.Marshal(request)
if err != nil {
return err
return errors.Wrap(err, "marshaling to protobuf")
}

return nil
resp, err := c.doRequest(uri, "POST", "/import", protobufHeaders, bytes.NewReader(data))
if err = anyError(resp, err); err != nil {
return errors.Wrap(err, "doing import request")
}
return errors.Wrap(resp.Body.Close(), "closing import response body")
}

func (c *Client) importValueNode(request *internal.ImportValueRequest) error {
func (c *Client) importValueNode(uri *URI, request *internal.ImportValueRequest) error {
data, _ := proto.Marshal(request)
// request.Marshal never returns an error
_, _, err := c.httpRequest("POST", "/import-value", data, protobufHeaders, noResponse)
_, err := c.doRequest(uri, "POST", "/import-value", protobufHeaders, bytes.NewReader(data))
if err != nil {
return err
return errors.Wrap(err, "doing /import-value request")
}

return nil
Expand All @@ -456,7 +439,7 @@ func (c *Client) ExportFrame(frame *Frame, view string) (BitIterator, error) {
return nil, err
}
sliceURIs := statusToNodeSlicesForIndex(status, frame.index.Name())
return NewCSVBitIterator(newExportReader(sliceURIs, frame, view)), nil
return NewCSVBitIterator(newExportReader(c, sliceURIs, frame, view)), nil
}

// Views fetches and returns the views of a frame
Expand Down Expand Up @@ -492,42 +475,40 @@ func (c *Client) patchFrameTimeQuantum(frame *Frame) error {
func (c *Client) status() (*Status, error) {
_, data, err := c.httpRequest("GET", "/status", nil, nil, errorCheckedResponse)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "requesting /status")
}
root := &statusRoot{}
err = json.Unmarshal(data, root)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "unmarshaling /status data")
}
return root.Status, nil
}

// httpRequest makes a request to the cluster - use this when you want the
// client to choose a host, and it doesn't matter if the request goes to a
// specific host
func (c *Client) httpRequest(method string, path string, data []byte, headers map[string]string, returnResponse returnClientInfo) (*http.Response, []byte, error) {
if data == nil {
data = []byte{}
}
reader := bytes.NewReader(data)

// try at most maxHosts non-failed hosts; protect against broken cluster.removeHost
var response *http.Response
var err error
for i := 0; i < maxHosts; i++ {
if c.host == nil {
c.host = c.cluster.Host()
if c.host == nil {
return nil, nil, ErrorEmptyCluster
}
reader := bytes.NewReader(data)
// get a host from the cluster
host := c.cluster.Host()
if host == nil {
return nil, nil, ErrorEmptyCluster
}
request, err := c.makeRequest(method, path, headers, reader)
if err != nil {
return nil, nil, err
}
response, err = c.client.Do(request)

response, err = c.doRequest(host, method, path, headers, reader)
if err == nil {
break
}
c.cluster.RemoveHost(c.host)
c.host = c.cluster.Host()
c.cluster.RemoveHost(host)
}
if response == nil {
return nil, nil, ErrorTriedMaxHosts
Expand Down Expand Up @@ -555,8 +536,40 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma
return response, buf, nil
}

func (c *Client) makeRequest(method string, path string, headers map[string]string, reader io.Reader) (*http.Request, error) {
request, err := http.NewRequest(method, c.host.Normalize()+path, reader)
// anyError checks an http Response and error to see if anything went wrong with
// a request (either locally, or on the server) and returns a single error if
// so.
func anyError(resp *http.Response, err error) error {
if err != nil {
return errors.Wrap(err, "unable to perform request")
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Wrapf(err, "bad status '%s' and err reading body", resp.Status)
}
msg := string(buf)
err = matchError(msg)
if err != nil {
return err
}
return errors.Errorf("Server error %s body:'%s'", resp.Status, msg)
}
return nil
}

// doRequest creates and performs an http request.
func (c *Client) doRequest(host *URI, method, path string, headers map[string]string, reader io.Reader) (*http.Response, error) {
req, err := makeRequest(host, method, path, headers, reader)
if err != nil {
return nil, errors.Wrap(err, "building request")
}
return c.client.Do(req)
}

func makeRequest(host *URI, method, path string, headers map[string]string, reader io.Reader) (*http.Request, error) {
request, err := http.NewRequest(method, host.Normalize()+path, reader)
if err != nil {
return nil, err
}
Expand All @@ -565,7 +578,7 @@ func (c *Client) makeRequest(method string, path string, headers map[string]stri
request.Header.Set(k, v)
}

return request, err
return request, nil
}

func newHTTPClient(options *ClientOptions) *http.Client {
Expand All @@ -582,16 +595,18 @@ func newHTTPClient(options *ClientOptions) *http.Client {
}
}

func makeRequestData(query string, options *QueryOptions) []byte {
func makeRequestData(query string, options *QueryOptions) ([]byte, error) {
request := &internal.QueryRequest{
Query: query,
ColumnAttrs: options.Columns,
ExcludeAttrs: options.ExcludeAttrs,
ExcludeBits: options.ExcludeBits,
}
r, _ := proto.Marshal(request)
// request.Marshal never returns an error
return r
r, err := proto.Marshal(request)
if err != nil {
return nil, errors.Wrap(err, "marshaling request to protobuf")
}
return r, nil
}

func matchError(msg string) error {
Expand Down Expand Up @@ -678,7 +693,7 @@ func (options *ClientOptions) withDefaults() (updated *ClientOptions) {
updated.SocketTimeout = time.Second * 300
}
if updated.ConnectTimeout <= 0 {
updated.ConnectTimeout = time.Second * 30
updated.ConnectTimeout = time.Second * 60
}
if updated.PoolSizePerRoute <= 0 {
updated.PoolSizePerRoute = 10
Expand Down Expand Up @@ -756,6 +771,7 @@ type viewsInfo struct {
}

type exportReader struct {
client *Client
sliceURIs map[uint64]*URI
frame *Frame
body []byte
Expand All @@ -765,8 +781,9 @@ type exportReader struct {
view string
}

func newExportReader(sliceURIs map[uint64]*URI, frame *Frame, view string) *exportReader {
func newExportReader(client *Client, sliceURIs map[uint64]*URI, frame *Frame, view string) *exportReader {
return &exportReader{
client: client,
sliceURIs: sliceURIs,
frame: frame,
sliceCount: uint64(len(sliceURIs)),
Expand All @@ -785,12 +802,16 @@ func (r *exportReader) Read(p []byte) (n int, err error) {
headers := map[string]string{
"Accept": "text/csv",
}
client := NewClientWithURI(uri)
path := fmt.Sprintf("/export?index=%s&frame=%s&slice=%d&view=%s",
r.frame.index.Name(), r.frame.Name(), r.currentSlice, r.view)
_, r.body, err = client.httpRequest("GET", path, nil, headers, errorCheckedResponse)
resp, err := r.client.doRequest(uri, "GET", path, headers, nil)
if err = anyError(resp, err); err != nil {
return 0, errors.Wrap(err, "doing export request")
}
defer resp.Body.Close()
r.body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
return 0, errors.Wrap(err, "reading response body")
}
r.bodyIndex = 0
}
Expand Down
40 changes: 40 additions & 0 deletions client_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package pilosa

import (
"bytes"
"io/ioutil"
"net/http"
"reflect"
"testing"
"testing/iotest"
)

func TestNewClientFromAddresses(t *testing.T) {
Expand Down Expand Up @@ -36,3 +40,39 @@ func TestNewClientFromAddresses(t *testing.T) {
t.Fatalf("Got error when creating empty client from addresses: %v", err)
}
}

func TestMakeRequestData(t *testing.T) {
q := make([]byte, 2<<30)
q[0] = 'a'
p := PQLBaseQuery{
pql: string(q),
}
uri, err := NewURIFromAddress("localhost:10101")
if err != nil {
t.Fatal(err)
}
cli := NewClientWithURI(uri)
resp, err := cli.Query(&p, nil)
if err == nil {
t.Fatalf("expected err with too large query, but got %v", resp)
}
}

func TestAnyError(t *testing.T) {
err := anyError(
&http.Response{StatusCode: 400,
Body: ioutil.NopCloser(iotest.TimeoutReader(bytes.NewBuffer([]byte("asdf"))))},
nil)
if err == nil {
t.Fatalf("should have gotten an error")
}

err = anyError(
&http.Response{StatusCode: 400,
Body: ioutil.NopCloser(bytes.NewBuffer([]byte("index already exists\n")))},
nil)
if err != ErrorIndexExists {
t.Fatalf("should have gotten ErrorIndexExists, but got %v", err)
}

}
Loading

0 comments on commit 01223c2

Please sign in to comment.