Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding logic to extract the Rate Limiting statistics #291

Merged
merged 7 commits into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io/ioutil"
"net/http"
"os"
"sync"
"time"
)

Expand All @@ -27,6 +28,19 @@ type Client struct {

//Option to specify extra headers like User-Agent
ExtraHeader map[string]string

// rateLimiting is used to store the rate limitting stats.
// More information in the official documentation: https://docs.datadoghq.com/api/?lang=bash#rate-limiting
rateLimitingStats map[string]RateLimit
// Mutex to protect the rate limiting map.
m sync.Mutex
}

type RateLimit struct {
Limit string
Period string
Reset string
Remaining string
}

// valid is the struct to unmarshal validation endpoint responses into.
Expand All @@ -44,11 +58,12 @@ func NewClient(apiKey, appKey string) *Client {
}

return &Client{
apiKey: apiKey,
appKey: appKey,
baseUrl: baseUrl,
HttpClient: http.DefaultClient,
RetryTimeout: time.Duration(60 * time.Second),
apiKey: apiKey,
appKey: appKey,
baseUrl: baseUrl,
HttpClient: http.DefaultClient,
RetryTimeout: time.Duration(60 * time.Second),
rateLimitingStats: make(map[string]RateLimit),
}
}

Expand Down
5 changes: 3 additions & 2 deletions logs_indexes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func TestLogsIndexGet(t *testing.T) {
defer ts.Close()

client := Client{
baseUrl: ts.URL,
HttpClient: http.DefaultClient,
baseUrl: ts.URL,
HttpClient: http.DefaultClient,
rateLimitingStats: make(map[string]RateLimit),
}

logsIndex, err := client.GetLogsIndex("main")
Expand Down
57 changes: 57 additions & 0 deletions ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package datadog

import (
"fmt"
"net/http"
"strings"
)

// The list of Rate Limited Endpoints of the Datadog API.
// https://docs.datadoghq.com/api/?lang=bash#rate-limiting
var (
rateLimitedEndpoints = map[string]string{
"/v1/query": "GET",
"/v1/input": "GET",
"/v1/metrics": "GET",
"/v1/events": "POST",
"/v1/logs-queries/list": "POST",
"/v1/graph/snapshot": "GET",
"/v1/logs/config/indexes": "GET",
}
)

func isRateLimited(method string, endpoint string) (limited bool, shortEndpoint string) {
for e, m := range rateLimitedEndpoints {
if strings.HasPrefix(endpoint, e) && m == method {
return true, e
}
}
return false, ""
}

func (client *Client) updateRateLimits(resp *http.Response, api string) error {
if resp == nil || resp.Header == nil {
return fmt.Errorf("could not parse headers from the HTTP response.")
}
client.m.Lock()
defer client.m.Unlock()
client.rateLimitingStats[api] = RateLimit{
Limit: resp.Header.Get("X-RateLimit-Limit"),
Reset: resp.Header.Get("X-RateLimit-Reset"),
Period: resp.Header.Get("X-RateLimit-Period"),
Remaining: resp.Header.Get("X-RateLimit-Remaining"),
}
return nil
}

// GetRateLimitStats is a threadsafe getter to retrieve the rate limiting stats associated with the Client.
func (client *Client) GetRateLimitStats() map[string]RateLimit {
client.m.Lock()
defer client.m.Unlock()
// Shallow copy to avoid corrupted data
mapCopy := make(map[string]RateLimit, len(client.rateLimitingStats))
for k, v := range client.rateLimitingStats {
mapCopy[k] = v
}
return mapCopy
}
122 changes: 122 additions & 0 deletions ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package datadog

import (
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

func Test_isRateLimited(t *testing.T) {
tests := []struct {
desc string
endpoint string
method string
isRateLimited bool
endpointFormated string
}{
{
desc: "is rate limited",
endpoint: "/v1/query?&query=avg:system.cpu.user{*}by{host}",
method: "GET",
isRateLimited: true,
endpointFormated: "/v1/query",
},
{
desc: "is not rate limited",
endpoint: "/v1/series?api_key=12",
method: "POST",
isRateLimited: false,
endpointFormated: "",
},
{
desc: "is rate limited but wrong method",
endpoint: "/v1/query?&query=avg:system.cpu.user{*}by{host}",
method: "POST",
isRateLimited: false,
endpointFormated: "",
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("#%d %s", i, tt.desc), func(t *testing.T) {
limited, edpt := isRateLimited(tt.method, tt.endpoint)
assert.Equal(t, limited, tt.isRateLimited)
assert.Equal(t, edpt, tt.endpointFormated)
})
}
}

func Test_updateRateLimits(t *testing.T) {
// fake client to ensure that we are race free.
client := Client{
rateLimitingStats: make(map[string]RateLimit),
}
tests := []struct {
desc string
api string
resp *http.Response
header RateLimit
error error
}{
{
"nominal case query",
"/v1/query",
makeHeader(RateLimit{"1", "2", "3", "4"}),
RateLimit{"1", "2", "3", "4"},
nil,
},
{
"nominal case logs",
"/v1/logs-queries/list",
makeHeader(RateLimit{"2", "2", "1", "5"}),
RateLimit{"2", "2", "1", "5"},
nil,
},
{
"no response",
"",
nil,
RateLimit{},
fmt.Errorf("could not parse headers from the HTTP response."),
},
{
"no header",
"/v2/error",
makeEmptyHeader(),
RateLimit{},
fmt.Errorf("could not parse headers from the HTTP response."),
},
{
"update case query",
"/v1/query",
makeHeader(RateLimit{"2", "4", "6", "4"}),
RateLimit{"2", "4", "6", "4"},
nil,
},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("#%d %s", i, tt.desc), func(t *testing.T) {
err := client.updateRateLimits(tt.resp, tt.api)
assert.Equal(t, tt.error, err)
assert.Equal(t, tt.header, client.rateLimitingStats[tt.api])
})
}
}

func makeHeader(r RateLimit) *http.Response {
h := http.Response{
Header: make(map[string][]string),
}
h.Header.Set("X-RateLimit-Limit", r.Limit)
h.Header.Set("X-RateLimit-Reset", r.Reset)
h.Header.Set("X-RateLimit-Period", r.Period)
h.Header.Set("X-RateLimit-Remaining", r.Remaining)
return &h
}

func makeEmptyHeader() *http.Response {
return &http.Response{
Header: nil,
}
}
9 changes: 9 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ func (client *Client) doJsonRequestUnredacted(method, api string,
body = []byte{'{', '}'}
}

limited, short := isRateLimited(method, api)
if limited {
err := client.updateRateLimits(resp, short)
if err != nil {
// Inability to update the rate limiting stats should not be a blocking error.
fmt.Printf("Error Updating the Rate Limit statistics: %s", err.Error())
}
}

// Try to parse common response fields to check whether there's an error reported in a response.
var common *Response
err = json.Unmarshal(body, &common)
Expand Down