Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
remove untestable code; improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Apr 3, 2018
1 parent 6c134d3 commit 00c785a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 262 deletions.
136 changes: 0 additions & 136 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,41 +399,6 @@ func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize ui
return nil
}

// ImportFrameK imports bits from the given iterator.
func (c *Client) ImportFrameK(frame *Frame, bitIterator BitIterator, batchSize uint) error {
linesLeft := true
bits := []Bit{}
var currentBatchSize uint
indexName := frame.index.name
frameName := frame.name

for linesLeft {
bit, err := bitIterator.NextBit()
if err == io.EOF {
linesLeft = false
} else if err != nil {
return err
} else {
bits = append(bits, bit)
currentBatchSize++
}

// if the batch is full or there's no line left, start importing bits
if currentBatchSize >= batchSize || !linesLeft {
if len(bits) > 0 {
err := c.importBitsK(indexName, frameName, bits)
if err != nil {
return err
}
}
bits = []Bit{}
currentBatchSize = 0
}
}

return nil
}

// ImportValueFrame imports field values from the given iterator.
func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator ValueIterator, batchSize uint) error {
linesLeft := true
Expand Down Expand Up @@ -477,42 +442,6 @@ func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator Valu
return nil
}

// ImportValueFrameK imports field values from the given iterator.
func (c *Client) ImportValueFrameK(frame *Frame, field string, valueIterator ValueIterator, batchSize uint) error {
linesLeft := true
vals := []FieldValue{}
var currentBatchSize uint
indexName := frame.index.name
frameName := frame.name
fieldName := field

for linesLeft {
val, err := valueIterator.NextValue()
if err == io.EOF {
linesLeft = false
} else if err != nil {
return err
} else {
vals = append(vals, val)
currentBatchSize++
}

// if the batch is full or there's no line left, start importing values
if currentBatchSize >= batchSize || !linesLeft {
if len(vals) > 0 {
err := c.importValuesK(indexName, frameName, fieldName, vals)
if err != nil {
return err
}
}
vals = []FieldValue{}
currentBatchSize = 0
}
}

return nil
}

func (c *Client) importBits(indexName string, frameName string, slice uint64, bits []Bit) error {
sort.Sort(bitsForSort(bits))
nodes, err := c.fetchFragmentNodes(indexName, slice)
Expand All @@ -539,16 +468,6 @@ func (c *Client) importBits(indexName string, frameName string, slice uint64, bi
return errors.Wrap(err, "importing to nodes")
}

func (c *Client) importBitsK(indexName string, frameName string, bits []Bit) error {
uri := c.cluster.Host()
err := c.importNode(uri, bitsToImportRequestK(indexName, frameName, bits))
if err != nil {
return err
}

return nil
}

func (c *Client) importValues(indexName string, frameName string, slice uint64, fieldName string, vals []FieldValue) error {
sort.Sort(valsForSort(vals))
nodes, err := c.fetchFragmentNodes(indexName, slice)
Expand All @@ -570,16 +489,6 @@ func (c *Client) importValues(indexName string, frameName string, slice uint64,
return nil
}

func (c *Client) importValuesK(indexName string, frameName string, fieldName string, vals []FieldValue) error {
uri := c.cluster.Host()
err := c.importValueNodeK(uri, valsToImportValueRequestK(indexName, frameName, fieldName, vals))
if err != nil {
return err
}

return nil
}

func (c *Client) fetchFragmentNodes(indexName string, slice uint64) ([]fragmentNode, error) {
path := fmt.Sprintf("/fragment/nodes?slice=%d&index=%s", slice, indexName)
_, body, err := c.httpRequest("GET", path, []byte{}, nil)
Expand Down Expand Up @@ -621,17 +530,6 @@ func (c *Client) importValueNode(uri *URI, request *pbuf.ImportValueRequest) err
return nil
}

func (c *Client) importValueNodeK(uri *URI, request *pbuf.ImportValueRequest) error {
data, _ := proto.Marshal(request)
// request.Marshal never returns an error
_, err := c.doRequest(uri, "POST", "/import-value", defaultProtobufHeaders(), bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "doing /import-value request")
}

return nil
}

// ExportFrame exports bits for a frame.
func (c *Client) ExportFrame(frame *Frame, view string) (BitIterator, error) {
var slicesMax map[string]uint64
Expand Down Expand Up @@ -894,24 +792,6 @@ func bitsToImportRequest(indexName string, frameName string, slice uint64, bits
}
}

func bitsToImportRequestK(indexName string, frameName string, bits []Bit) *pbuf.ImportRequest {
rowKeys := make([]string, 0, len(bits))
columnKeys := make([]string, 0, len(bits))
timestamps := make([]int64, 0, len(bits))
for _, bit := range bits {
rowKeys = append(rowKeys, bit.RowKey)
columnKeys = append(columnKeys, bit.ColumnKey)
timestamps = append(timestamps, bit.Timestamp)
}
return &pbuf.ImportRequest{
Index: indexName,
Frame: frameName,
RowKeys: rowKeys,
ColumnKeys: columnKeys,
Timestamps: timestamps,
}
}

func valsToImportRequest(indexName string, frameName string, slice uint64, fieldName string, vals []FieldValue) *pbuf.ImportValueRequest {
columnIDs := make([]uint64, 0, len(vals))
values := make([]int64, 0, len(vals))
Expand All @@ -929,22 +809,6 @@ func valsToImportRequest(indexName string, frameName string, slice uint64, field
}
}

func valsToImportValueRequestK(indexName string, frameName string, fieldName string, vals []FieldValue) *pbuf.ImportValueRequest {
columnKeys := make([]string, 0, len(vals))
values := make([]int64, 0, len(vals))
for _, val := range vals {
columnKeys = append(columnKeys, val.ColumnKey)
values = append(values, val.Value)
}
return &pbuf.ImportValueRequest{
Index: indexName,
Frame: frameName,
Field: fieldName,
ColumnKeys: columnKeys,
Values: values,
}
}

// ClientOptions control the properties of client connection to the server.
type ClientOptions struct {
SocketTimeout time.Duration
Expand Down
126 changes: 0 additions & 126 deletions imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,83 +149,6 @@ func (b bitsForSort) Less(i, j int) bool {
return bitCmp < 0
}

// CSVBitIteratorK reads bits from a Reader.
// Each line should contain a single bitK in the following form:
// rowKey,columnKey[,timestamp]
type CSVBitIteratorK struct {
reader io.Reader
line int
scanner *bufio.Scanner
timestampFormat string
}

// NewCSVBitIteratorK creates a CSVBitIteratorK from a Reader.
func NewCSVBitIteratorK(reader io.Reader) *CSVBitIteratorK {
return &CSVBitIteratorK{
reader: reader,
line: 0,
scanner: bufio.NewScanner(reader),
}
}

// NewCSVBitIteratorKWithTimestampFormat creates a CSVBitIteratorK from a Reader with a custom timestamp format.
func NewCSVBitIteratorKWithTimestampFormat(reader io.Reader, timestampFormat string) *CSVBitIteratorK {
return &CSVBitIteratorK{
reader: reader,
line: 0,
scanner: bufio.NewScanner(reader),
timestampFormat: timeFormat,
}
}

// NextBit iterates on lines of a Reader.
// Returns io.EOF on end of iteration.
func (c *CSVBitIteratorK) NextBit() (Bit, error) {
if ok := c.scanner.Scan(); ok {
c.line++
text := strings.TrimSpace(c.scanner.Text())
parts := strings.Split(text, ",")
if len(parts) < 2 {
return Bit{}, fmt.Errorf("Invalid CSV line: %d", c.line)
}
if parts[0] == "" {
return Bit{}, fmt.Errorf("Invalid row key at line: %d", c.line)
}
rowKey := parts[0]
if parts[1] == "" {
return Bit{}, fmt.Errorf("Invalid column key at line: %d", c.line)
}
columnKey := parts[1]
timestamp := 0
var err error
if len(parts) == 3 {
if c.timestampFormat == "" {
timestamp, err = strconv.Atoi(parts[2])
if err != nil {
return Bit{}, fmt.Errorf("Invalid timestamp at line: %d", c.line)
}
} else {
t, err := time.Parse(c.timestampFormat, parts[2])
if err != nil {
return Bit{}, fmt.Errorf("Invalid timestamp at line: %d", c.line)
}
timestamp = int(t.Unix())
}
}
bit := Bit{
RowKey: rowKey,
ColumnKey: columnKey,
Timestamp: int64(timestamp),
}
return bit, nil
}
err := c.scanner.Err()
if err != nil {
return Bit{}, err
}
return Bit{}, io.EOF
}

// FieldValue represents the value for a column within a
// range-encoded frame.
type FieldValue struct {
Expand Down Expand Up @@ -301,52 +224,3 @@ func (v valsForSort) Swap(i, j int) {
func (v valsForSort) Less(i, j int) bool {
return v[i].ColumnID < v[j].ColumnID
}

// CSVValueIteratorK reads field valueKs from a Reader.
// Each line should contain a single field valueK in the following form:
// columnKey,value
type CSVValueIteratorK struct {
reader io.Reader
line int
scanner *bufio.Scanner
}

// NewCSVValueIteratorK creates a CSVValueIteratorK from a Reader.
func NewCSVValueIteratorK(reader io.Reader) *CSVValueIteratorK {
return &CSVValueIteratorK{
reader: reader,
line: 0,
scanner: bufio.NewScanner(reader),
}
}

// NextValueK iterates on lines of a Reader.
// Returns io.EOF on end of iteration.
func (c *CSVValueIteratorK) NextValue() (FieldValue, error) {
if ok := c.scanner.Scan(); ok {
c.line++
text := strings.TrimSpace(c.scanner.Text())
parts := strings.Split(text, ",")
if len(parts) < 2 {
return FieldValue{}, fmt.Errorf("Invalid CSV line: %d", c.line)
}
if parts[0] == "" {
return FieldValue{}, fmt.Errorf("Invalid column key at line: %d", c.line)
}
columnKey := parts[0]
value, err := strconv.Atoi(parts[1])
if err != nil {
return FieldValue{}, fmt.Errorf("Invalid value at line: %d", c.line)
}
fieldValue := FieldValue{
ColumnKey: columnKey,
Value: int64(value),
}
return fieldValue, nil
}
err := c.scanner.Err()
if err != nil {
return FieldValue{}, err
}
return FieldValue{}, io.EOF
}
Loading

0 comments on commit 00c785a

Please sign in to comment.