Skip to content

Commit 1236c95

Browse files
committed
add request throttling logic
1 parent a575bb1 commit 1236c95

File tree

4 files changed

+165
-2
lines changed

4 files changed

+165
-2
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Set up Go
2020
uses: actions/setup-go@v4
2121
with:
22-
go-version: '1.16'
22+
go-version: '1.19'
2323

2424
- name: Test
2525
run: go test -v ./...

counter.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package retryhttp
2+
3+
import (
4+
"sync/atomic"
5+
"time"
6+
)
7+
8+
type atomicCounter struct {
9+
entries []uint64
10+
currEntry atomic.Int32
11+
}
12+
13+
func newAtomicCounter(stop <-chan bool, numEntries int, timeSpan time.Duration) *atomicCounter {
14+
c := atomicCounter{
15+
entries: make([]uint64, numEntries),
16+
}
17+
18+
// move buckets on a timer
19+
t := time.NewTicker(timeSpan / time.Duration(numEntries))
20+
go func() {
21+
for {
22+
select {
23+
case <-stop:
24+
// the ticker cannot be gc'd until Stop is called
25+
t.Stop()
26+
return
27+
case <-t.C:
28+
// calculate next index
29+
next := (c.currEntry.Load() + 1) % int32(numEntries)
30+
31+
// clear the next bucket
32+
atomic.StoreUint64(&c.entries[next], 0)
33+
34+
// update current index to next
35+
c.currEntry.Store(next)
36+
}
37+
}
38+
}()
39+
40+
return &c
41+
}
42+
43+
func (c *atomicCounter) increment() {
44+
atomic.AddUint64(&c.entries[c.currEntry.Load()], 1)
45+
}
46+
47+
func (c *atomicCounter) read() uint64 {
48+
var sum uint64
49+
for _, entry := range c.entries {
50+
sum += entry
51+
}
52+
53+
return sum
54+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/justinrixx/retryhttp
22

3-
go 1.16
3+
go 1.19

throttler.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package retryhttp
2+
3+
import (
4+
"context"
5+
"errors"
6+
"math"
7+
"math/rand"
8+
"net/http"
9+
"time"
10+
)
11+
12+
const (
13+
defaultTimespanSeconds = 120 // 2 mins
14+
defaultBuckets = defaultTimespanSeconds / 10 // 10s per bucket
15+
)
16+
17+
var ErrThrottled = errors.New("retry throttled")
18+
19+
// Throttler is an interface decides if a retry should be throttled
20+
type Throttler interface {
21+
ShouldThrottle(res *http.Response, err error) bool
22+
Stop()
23+
}
24+
25+
type defaultThrottler struct {
26+
totalReqs *atomicCounter
27+
overloadedReqs *atomicCounter
28+
retriedReqs *atomicCounter
29+
k float64
30+
retryBudget float64
31+
32+
totalStop chan<- bool
33+
overloadStop chan<- bool
34+
retriedStop chan<- bool
35+
}
36+
37+
// NewDefaultThrottler constructs a new parameterized throttler. k is described in
38+
// [this equation].
39+
// retryBudget is the per-client retry budget described in the "Deciding to Retry"
40+
// section of the same chapter.
41+
//
42+
// [this equation]: https://sre.google/sre-book/handling-overload/#eq2101
43+
func NewDefaultThrottler(k float64, retryBudget float64) defaultThrottler {
44+
totalStop := make(chan bool)
45+
overloadStop := make(chan bool)
46+
retriedStop := make(chan bool)
47+
48+
totalCounter := newAtomicCounter(totalStop, defaultBuckets, time.Second*defaultTimespanSeconds)
49+
overloadCounter := newAtomicCounter(overloadStop, defaultBuckets, time.Second*defaultTimespanSeconds)
50+
retriedCounter := newAtomicCounter(retriedStop, defaultBuckets, time.Second*defaultTimespanSeconds)
51+
52+
return defaultThrottler{
53+
totalReqs: totalCounter,
54+
overloadedReqs: overloadCounter,
55+
retriedReqs: retriedCounter,
56+
k: k,
57+
retryBudget: retryBudget,
58+
}
59+
}
60+
61+
// ShouldThrottle decides whether a request should be throttled.
62+
func (t defaultThrottler) ShouldThrottle(res *http.Response, err error, isRetry bool) bool {
63+
t.recordStats(res, err, isRetry)
64+
65+
total := t.totalReqs.read()
66+
fTotal := float64(total)
67+
68+
if isRetry {
69+
// check per-client retry budget
70+
retried := t.retriedReqs.read()
71+
if float64(retried)/fTotal > t.retryBudget {
72+
return false
73+
}
74+
}
75+
76+
// https://sre.google/sre-book/handling-overload/#eq2101
77+
overloaded := t.overloadedReqs.read()
78+
fAccepts := float64(total - overloaded)
79+
p := math.Max(0, (fTotal-(t.k*fAccepts))/(fTotal+1))
80+
81+
return p > rand.Float64()
82+
}
83+
84+
// recordStats records information about a request which the throttler can use later
85+
// to make throttling decisions. This throttler records 429s and context deadline
86+
// exceeded errors as signal of overload.
87+
func (t defaultThrottler) recordStats(res *http.Response, err error, isRetry bool) {
88+
t.totalReqs.increment()
89+
if isRetry {
90+
t.retriedReqs.increment()
91+
}
92+
93+
if (err != nil && errors.Is(err, context.DeadlineExceeded)) ||
94+
(res != nil && res.StatusCode == http.StatusTooManyRequests) {
95+
t.overloadedReqs.increment()
96+
}
97+
}
98+
99+
func (t defaultThrottler) Stop() {
100+
go func() {
101+
t.totalStop <- true
102+
}()
103+
go func() {
104+
t.overloadStop <- true
105+
}()
106+
go func() {
107+
t.retriedStop <- true
108+
}()
109+
}

0 commit comments

Comments
 (0)