Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Cluster blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Jul 7, 2017
1 parent f7e4034 commit 0dd9b9b
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 32 deletions.
53 changes: 39 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ import (
"github.com/pilosa/go-pilosa/internal"
)

const maxHosts = 10

// Client is the HTTP client for Pilosa server.
type Client struct {
cluster *Cluster
host *URI
client *http.Client
}

Expand Down Expand Up @@ -378,24 +381,34 @@ func (c *Client) status() (*Status, error) {
}

func (c *Client) httpRequest(method string, path string, data []byte, returnResponse returnClientInfo) (*http.Response, []byte, error) {
addr := c.cluster.Host()
if addr == nil {
return nil, nil, ErrorEmptyCluster
}
if data == nil {
data = []byte{}
}
request, err := http.NewRequest(method, addr.Normalize()+path, bytes.NewReader(data))
if err != nil {
return nil, nil, err
reader := bytes.NewReader(data)

// try at most maxHosts non-failed hosts; protect against broken cluster.removeHost
var response *http.Response
var err error
for i := 0; i < maxHosts; i++ {
if c.host == nil {
c.host = c.cluster.Host()
if c.host == nil {
return nil, nil, ErrorEmptyCluster
}
}
request, err := c.makeRequest(method, path, reader)
if err != nil {
return nil, nil, err
}
response, err = c.client.Do(request)
if err == nil {
break
}
c.cluster.RemoveHost(c.host)
c.host = c.cluster.Host()
}
// both Content-Type and Accept headers must be set for protobuf content
request.Header.Set("Content-Type", "application/x-protobuf")
request.Header.Set("Accept", "application/x-protobuf")
response, err := c.client.Do(request)
if err != nil {
c.cluster.RemoveHost(addr)
return nil, nil, err
if response == nil {
return nil, nil, ErrorTriedMaxHosts
}
defer response.Body.Close()
// TODO: Optimize buffer creation
Expand All @@ -420,6 +433,18 @@ func (c *Client) httpRequest(method string, path string, data []byte, returnResp
return response, buf, nil
}

func (c *Client) makeRequest(method string, path string, reader io.Reader) (*http.Request, error) {
request, err := http.NewRequest(method, c.host.Normalize()+path, reader)
if err != nil {
return nil, err
}
// both Content-Type and Accept headers must be set for protobuf content
request.Header.Set("Content-Type", "application/x-protobuf")
request.Header.Set("Accept", "application/x-protobuf")

return request, err
}

func newHTTPClient(options *ClientOptions) *http.Client {
transport := &http.Transport{
Dial: (&net.Dialer{
Expand Down
63 changes: 46 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,23 @@

package pilosa

import (
"sync"
)

// Cluster contains hosts in a Pilosa cluster.
type Cluster struct {
hosts []*URI
nextIndex int
hosts []*URI
okList []bool
mutex *sync.RWMutex
}

// DefaultCluster returns the default Cluster.
func DefaultCluster() *Cluster {
return &Cluster{
hosts: make([]*URI, 0),
hosts: make([]*URI, 0),
okList: make([]bool, 0),
mutex: &sync.RWMutex{},
}
}

Expand All @@ -56,35 +63,57 @@ func NewClusterWithHost(hosts ...*URI) *Cluster {

// AddHost adds a host to the cluster.
func (c *Cluster) AddHost(address *URI) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.hosts = append(c.hosts, address)
c.okList = append(c.okList, true)
}

// Host returns the next host in the cluster.
// Host returns a host in the cluster.
func (c *Cluster) Host() *URI {
if len(c.hosts) == 0 {
return nil
c.mutex.RLock()
var host *URI
for i, ok := range c.okList {
if ok {
host = c.hosts[i]
break
}
}
c.mutex.RUnlock()
if host != nil {
return host
}
// Return the transport, e.g., http from http+protobuf
uri := c.hosts[c.nextIndex%len(c.hosts)]
c.nextIndex = (c.nextIndex + 1) % len(c.hosts)
return uri
c.reset()
return host
}

// RemoveHost removes the host with the given URI from the cluster.
// RemoveHost black lists the host with the given URI from the cluster.
func (c *Cluster) RemoveHost(address *URI) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i, uri := range c.hosts {
if uri.Equals(address) {
c.hosts = append(c.hosts[:i], c.hosts[i+1:]...)
c.okList[i] = false
break
}
}
}

// Hosts returns all hosts in the cluster.
// Hosts returns all available hosts in the cluster.
func (c *Cluster) Hosts() []URI {
arr := make([]URI, 0, len(c.hosts))
for _, u := range c.hosts {
arr = append(arr, *u)
hosts := make([]URI, 0, len(c.hosts))
for i, host := range c.hosts {
if c.okList[i] {
hosts = append(hosts, *host)
}
}
return hosts
}

func (c *Cluster) reset() {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := range c.okList {
c.okList[i] = true
}
return arr
}
2 changes: 1 addition & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRemoveHost(t *testing.T) {
t.Fatal(err)
}
c.RemoveHost(uri)
if len(c.hosts) != 0 {
if len(c.Hosts()) != 0 {
t.Fatalf("The cluster should not contain the host")
}
}
1 change: 1 addition & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ var (
ErrorInvalidFrameName = NewError("Invalid frame name")
ErrorInvalidLabel = NewError("Invalid label")
ErrorInverseBitmapsNotEnabled = NewError("Inverse bitmaps support was not enabled for this frame")
ErrorTriedMaxHosts = NewError("Tried max hosts, still failing")
)
36 changes: 36 additions & 0 deletions importer/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"bufio"
"fmt"
"os"
"time"

pilosa "github.com/pilosa/go-pilosa"
)

func main() {
client := pilosa.DefaultClient()
index, _ := pilosa.NewIndex("i", nil)
frame, _ := index.Frame("f", nil)
file, err := os.Open("/home/yuce/ramdisk/testdata/index-frame1_100x1050000.csv")
if err != nil {
panic(err)
}
err = client.EnsureIndex(index)
if err != nil {
panic(err)
}
err = client.EnsureFrame(frame)
if err != nil {
panic(err)
}
iterator := pilosa.NewCSVBitIterator(bufio.NewReader(file))
tic := time.Now()
err = client.ImportFrame(frame, iterator, 1000000)
if err != nil {
panic(err)
}
tac := time.Since(tic)
fmt.Println("TAC:", tac)
}

0 comments on commit 0dd9b9b

Please sign in to comment.