Skip to content

Commit

Permalink
cherry-pick: feat(query): Add mechanism to have a limit on number of …
Browse files Browse the repository at this point in the history
…pending queries (#7603)

This is useful to avoid blowing up the number of goroutines handling queries, if the user bombards Dgraph with lots of concurrent / asynchronous requests.

(cherry picked from commit 1450f81)
  • Loading branch information
manishrjain authored and OmarAyo committed Aug 17, 2021
1 parent 8d3eb76 commit 10d1edf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 19 deletions.
1 change: 1 addition & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func run() {
return
}
}
edgraph.Init()

x.PrintVersion()
glog.Infof("x.Config: %+v", x.Config)
Expand Down
53 changes: 38 additions & 15 deletions dgraph/cmd/increment/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/dgraph-io/dgo/v200"
Expand All @@ -49,11 +50,12 @@ func init() {

flag := Increment.Cmd.Flags()
flag.String("alpha", "localhost:9080", "Address of Dgraph Alpha.")
flag.Int("num", 1, "How many times to run.")
flag.Int("num", 1, "How many times to run per goroutine.")
flag.Int("retries", 10, "How many times to retry setting up the connection.")
flag.Duration("wait", 0*time.Second, "How long to wait.")
flag.String("user", "", "Username if login is required.")
flag.String("password", "", "Password of the user.")
flag.Int("conc", 1, "How many goroutines to run.")
flag.String("pred", "counter.val",
"Predicate to use for storing the counter.")
flag.Bool("ro", false,
Expand Down Expand Up @@ -167,25 +169,46 @@ func run(conf *viper.Viper) {

waitDur := conf.GetDuration("wait")
num := conf.GetInt("num")
conc := int(conf.GetInt("conc"))
format := "0102 03:04:05.999"

dg, closeFunc := x.GetDgraphClient(Increment.Conf, true)
defer closeFunc()

for num > 0 {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
time.Sleep(time.Second)
continue
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf("%-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n", now, cnt.Val,
cnt.startTs, cnt.qLatency, cnt.mLatency, serverLat, clientLat, clientLat-serverLat)
// Run things serially first.
for i := 0; i < conc; i++ {
_, err := process(dg, conf)
x.Check(err)
num--
time.Sleep(waitDur)
}

var wg sync.WaitGroup
f := func(i int) {
defer wg.Done()
count := 0
for count < num {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
time.Sleep(time.Second)
continue
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
time.Sleep(waitDur)
count++
}
}

for i := 0; i < conc; i++ {
wg.Add(1)
go f(i)
}
wg.Wait()
}
25 changes: 21 additions & 4 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,22 +1009,39 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, er
return s.doQuery(ctx, req, NoAuthorize)
}

<<<<<<< HEAD
func (s *Server) doQuery(ctx context.Context, req *api.Request, doAuth AuthMode) (
=======
var pendingQueries int64
var maxPendingQueries int64
var serverOverloadErr = errors.New("429 Too Many Requests. Please throttle your requests")

func Init() {
maxPendingQueries = x.Config.Limit.GetInt64("max-pending-queries")
}

func (s *Server) doQuery(ctx context.Context, req *Request) (
>>>>>>> 1450f8100... feat(query): Add mechanism to have a limit on number of pending queries (#7603)
resp *api.Response, rerr error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
defer atomic.AddInt64(&pendingQueries, -1)
if val := atomic.AddInt64(&pendingQueries, 1); val > maxPendingQueries {
return nil, serverOverloadErr
}

if bool(glog.V(3)) || worker.LogRequestEnabled() {
glog.Infof("Got a query: %+v", req)
}

isGraphQL, _ := ctx.Value(IsGraphql).(bool)
if isGraphQL {
atomic.AddUint64(&numGraphQL, 1)
} else {
atomic.AddUint64(&numGraphQLPM, 1)
}

if ctx.Err() != nil {
return nil, ctx.Err()
}

l := &query.Latency{}
l.Start = time.Now()

Expand Down

0 comments on commit 10d1edf

Please sign in to comment.