@@ -25,8 +25,13 @@ import (
25
25
"strconv"
26
26
"time"
27
27
28
+ "github.com/cortexlabs/cortex/pkg/lib/aws"
29
+ "github.com/cortexlabs/cortex/pkg/lib/errors"
28
30
"github.com/cortexlabs/cortex/pkg/lib/logging"
31
+ "github.com/cortexlabs/cortex/pkg/lib/telemetry"
29
32
"github.com/cortexlabs/cortex/pkg/proxy"
33
+ "github.com/cortexlabs/cortex/pkg/types/clusterconfig"
34
+ "github.com/cortexlabs/cortex/pkg/types/userconfig"
30
35
"go.uber.org/zap"
31
36
)
32
37
@@ -35,20 +40,45 @@ const (
35
40
_requestSampleInterval = 1 * time .Second
36
41
)
37
42
43
+ var (
44
+ proxyLogger = logging .GetLogger ()
45
+ )
46
+
47
+ func Exit (err error , wrapStrs ... string ) {
48
+ for _ , str := range wrapStrs {
49
+ err = errors .Wrap (err , str )
50
+ }
51
+
52
+ if err != nil && ! errors .IsNoTelemetry (err ) {
53
+ telemetry .Error (err )
54
+ }
55
+
56
+ if err != nil && ! errors .IsNoPrint (err ) {
57
+ proxyLogger .Error (err )
58
+ }
59
+
60
+ telemetry .Close ()
61
+
62
+ os .Exit (1 )
63
+ }
64
+
38
65
func main () {
39
66
var (
40
67
port int
41
68
metricsPort int
42
69
userContainerPort int
43
70
maxConcurrency int
44
71
maxQueueLength int
72
+ clusterConfigPath string
45
73
)
46
74
47
- flag .IntVar (& port , "port" , 8000 , "port where the proxy will be served" )
48
- flag .IntVar (& metricsPort , "metrics-port" , 8001 , "port where the proxy will be served " )
49
- flag .IntVar (& userContainerPort , "user-port" , 8080 , "port where the proxy will redirect to the traffic to" )
75
+ flag .IntVar (& port , "port" , 8888 , "port where the proxy is served" )
76
+ flag .IntVar (& metricsPort , "metrics-port" , 15000 , "metrics port for prometheus " )
77
+ flag .IntVar (& userContainerPort , "user-port" , 8080 , "port where the proxy redirects to the traffic to" )
50
78
flag .IntVar (& maxConcurrency , "max-concurrency" , 0 , "max concurrency allowed for user container" )
51
79
flag .IntVar (& maxQueueLength , "max-queue-length" , 0 , "max request queue length for user container" )
80
+ flag .StringVar (& clusterConfigPath , "cluster-config" , "" , "cluster config path" )
81
+
52
82
flag .Parse ()
53
83
54
84
log := logging .GetLogger ()
@@ -58,12 +88,44 @@ func main() {
58
88
59
89
switch {
60
90
case maxConcurrency == 0 :
61
- log .Fatal ("-- max-concurrency flag is required" )
91
+ log .Fatal ("-max-concurrency flag is required" )
62
92
case maxQueueLength == 0 :
63
- maxQueueLength = maxConcurrency * 10
93
+ log .Fatal ("-max-queue-length flag is required" )
94
+ case clusterConfigPath == "" :
95
+ log .Fatal ("-cluster-config flag is required" )
96
+ }
97
+
98
+ clusterConfig , err := clusterconfig .NewForFile (clusterConfigPath )
99
+ if err != nil {
100
+ Exit (err )
101
+ }
102
+
103
+ awsClient , err := aws .NewForRegion (clusterConfig .Region )
104
+ if err != nil {
105
+ Exit (err )
106
+ }
107
+
108
+ _ , userID , err := awsClient .CheckCredentials ()
109
+ if err != nil {
110
+ Exit (err )
111
+ }
112
+
113
+ err = telemetry .Init (telemetry.Config {
114
+ Enabled : clusterConfig .Telemetry ,
115
+ UserID : userID ,
116
+ Properties : map [string ]string {
117
+ "kind" : userconfig .RealtimeAPIKind .String (),
118
+ "image_type" : "proxy" ,
119
+ },
120
+ Environment : "api" ,
121
+ LogErrors : true ,
122
+ BackoffMode : telemetry .BackoffDuplicateMessages ,
123
+ })
124
+ if err != nil {
125
+ Exit (err )
64
126
}
65
127
66
- target := "http://127.0.0.1:" + strconv .Itoa (port )
128
+ target := "http://127.0.0.1:" + strconv .Itoa (userContainerPort )
67
129
httpProxy := proxy .NewReverseProxy (target , maxQueueLength , maxQueueLength )
68
130
69
131
requestCounterStats := & proxy.RequestStats {}
@@ -101,7 +163,7 @@ func main() {
101
163
102
164
servers := map [string ]* http.Server {
103
165
"proxy" : {
104
- Addr : ":" + strconv .Itoa (userContainerPort ),
166
+ Addr : ":" + strconv .Itoa (port ),
105
167
Handler : proxy .Handler (breaker , httpProxy ),
106
168
},
107
169
"metrics" : {
@@ -123,7 +185,7 @@ func main() {
123
185
124
186
select {
125
187
case err := <- errCh :
126
- log . Fatal ( "failed to start proxy server" , zap . Error ( err ))
188
+ Exit ( errors . Wrap ( err , "failed to start proxy server" ))
127
189
case <- sigint :
128
190
// We received an interrupt signal, shut down.
129
191
log .Info ("Received TERM signal, handling a graceful shutdown..." )
@@ -133,8 +195,10 @@ func main() {
133
195
if err := server .Shutdown (context .Background ()); err != nil {
134
196
// Error from closing listeners, or context timeout:
135
197
log .Warn ("HTTP server Shutdown Error" , zap .Error (err ))
198
+ telemetry .Error (errors .Wrap (err , "HTTP server Shutdown Error" ))
136
199
}
137
200
}
138
201
log .Info ("Shutdown complete, exiting..." )
202
+ telemetry .Close ()
139
203
}
140
204
}
0 commit comments