Skip to content

Commit d9b8e6e

Browse files
author
Miguel Varela Ramos
authored
HTTP Reverse Proxy implementation (#2172)
* Barebones proxy implementation * Add breaker and proxy transport * Remove stutter from proxy handler * Add prometheus metrics and graceful signal handling * Simplify handler code * Make in flight request counter variable int64 * Remove request-monitor * Add proxy Dockerfile * Rename flags
1 parent c49eff5 commit d9b8e6e

File tree

21 files changed

+1274
-389
lines changed

21 files changed

+1274
-389
lines changed

cmd/proxy/main.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"flag"
22+
"net/http"
23+
"os"
24+
"os/signal"
25+
"strconv"
26+
"time"
27+
28+
"github.com/cortexlabs/cortex/pkg/lib/logging"
29+
"github.com/cortexlabs/cortex/pkg/proxy"
30+
"go.uber.org/zap"
31+
)
32+
33+
const (
34+
_reportInterval = 10 * time.Second
35+
_requestSampleInterval = 1 * time.Second
36+
)
37+
38+
func main() {
39+
var (
40+
port int
41+
metricsPort int
42+
userContainerPort int
43+
maxConcurrency int
44+
maxQueueLength int
45+
)
46+
47+
flag.IntVar(&port, "port", 8000, "port where the proxy will be served")
48+
flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the proxy will be served")
49+
flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to")
50+
flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container")
51+
flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container")
52+
flag.Parse()
53+
54+
log := logging.GetLogger()
55+
defer func() {
56+
_ = log.Sync()
57+
}()
58+
59+
switch {
60+
case maxConcurrency == 0:
61+
log.Fatal("--max-concurrency flag is required")
62+
case maxQueueLength == 0:
63+
maxQueueLength = maxConcurrency * 10
64+
}
65+
66+
target := "http://127.0.0.1:" + strconv.Itoa(port)
67+
httpProxy := proxy.NewReverseProxy(target, maxQueueLength, maxQueueLength)
68+
69+
requestCounterStats := &proxy.RequestStats{}
70+
breaker := proxy.NewBreaker(
71+
proxy.BreakerParams{
72+
QueueDepth: maxQueueLength,
73+
MaxConcurrency: maxConcurrency,
74+
InitialCapacity: maxConcurrency,
75+
},
76+
)
77+
78+
promStats := proxy.NewPrometheusStatsReporter()
79+
80+
go func() {
81+
reportTicker := time.NewTicker(_reportInterval)
82+
defer reportTicker.Stop()
83+
84+
requestSamplingTicker := time.NewTicker(_requestSampleInterval)
85+
defer requestSamplingTicker.Stop()
86+
87+
for {
88+
select {
89+
case <-reportTicker.C:
90+
go func() {
91+
report := requestCounterStats.Report()
92+
promStats.Report(report)
93+
}()
94+
case <-requestSamplingTicker.C:
95+
go func() {
96+
requestCounterStats.Append(breaker.InFlight())
97+
}()
98+
}
99+
}
100+
}()
101+
102+
servers := map[string]*http.Server{
103+
"proxy": {
104+
Addr: ":" + strconv.Itoa(userContainerPort),
105+
Handler: proxy.Handler(breaker, httpProxy),
106+
},
107+
"metrics": {
108+
Addr: ":" + strconv.Itoa(metricsPort),
109+
Handler: promStats,
110+
},
111+
}
112+
113+
errCh := make(chan error)
114+
for name, server := range servers {
115+
go func(name string, server *http.Server) {
116+
log.Infof("Starting %s server on %s", name, server.Addr)
117+
errCh <- server.ListenAndServe()
118+
}(name, server)
119+
}
120+
121+
sigint := make(chan os.Signal, 1)
122+
signal.Notify(sigint, os.Interrupt)
123+
124+
select {
125+
case err := <-errCh:
126+
log.Fatal("failed to start proxy server", zap.Error(err))
127+
case <-sigint:
128+
// We received an interrupt signal, shut down.
129+
log.Info("Received TERM signal, handling a graceful shutdown...")
130+
131+
for name, server := range servers {
132+
log.Infof("Shutting down %s server", name)
133+
if err := server.Shutdown(context.Background()); err != nil {
134+
// Error from closing listeners, or context timeout:
135+
log.Warn("HTTP server Shutdown Error", zap.Error(err))
136+
}
137+
}
138+
log.Info("Shutdown complete, exiting...")
139+
}
140+
}

cmd/request-monitor/main.go

Lines changed: 0 additions & 184 deletions
This file was deleted.

design-spec/async.yaml

Lines changed: 0 additions & 38 deletions
This file was deleted.

design-spec/batch.yaml

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)