From e1bc65a5af79eb38ded808cadbd330c2f1c52199 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 17:22:35 +0100 Subject: [PATCH 01/15] Added proxy readiness probe implementation --- pkg/proxy/consts.go | 8 +- pkg/proxy/handler.go | 4 +- pkg/proxy/probe/probe.go | 187 ++++++++++++++++++++++++++++++++++ pkg/proxy/probe/probe_test.go | 121 ++++++++++++++++++++++ 4 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 pkg/proxy/probe/probe.go create mode 100644 pkg/proxy/probe/probe_test.go diff --git a/pkg/proxy/consts.go b/pkg/proxy/consts.go index 95c415378c..cf0576ae38 100644 --- a/pkg/proxy/consts.go +++ b/pkg/proxy/consts.go @@ -17,14 +17,16 @@ limitations under the License. package proxy const ( - _userAgentKey = "User-Agent" + // UserAgentKey is the user agent header key + UserAgentKey = "User-Agent" + // KubeProbeUserAgentPrefix is the user agent header prefix used in k8s probes // Since K8s 1.8, prober requests have // User-Agent = "kube-probe/{major-version}.{minor-version}". - _kubeProbeUserAgentPrefix = "kube-probe/" + KubeProbeUserAgentPrefix = "kube-probe/" // KubeletProbeHeaderName is the header name to augment the probes, because // Istio with mTLS rewrites probes, but their probes pass a different // user-agent. - _kubeletProbeHeaderName = "K-Kubelet-Probe" + KubeletProbeHeaderName = "K-Kubelet-Probe" ) diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index ed59616f99..30a9171b04 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -43,6 +43,6 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc { } func isKubeletProbe(r *http.Request) bool { - return strings.HasPrefix(r.Header.Get(_userAgentKey), _kubeProbeUserAgentPrefix) || - r.Header.Get(_kubeletProbeHeaderName) != "" + return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUserAgentPrefix) || + r.Header.Get(KubeletProbeHeaderName) != "" } diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go new file mode 100644 index 0000000000..9063c0c68d --- /dev/null +++ b/pkg/proxy/probe/probe.go @@ -0,0 +1,187 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe + +import ( + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "time" + + s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/proxy" + "go.uber.org/zap" + kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + _defaultInitialDelaySeconds = 1 + _defaultTimeoutSeconds = 3 + _defaultPeriodSeconds = 1 + _defaultSuccessThreshold = 1 + _defaultFailureThreshold = 1 +) + +type Probe struct { + *kcore.Probe + count int32 + logger *zap.SugaredLogger +} + +func NewProbe(probe *kcore.Probe, logger *zap.SugaredLogger) *Probe { + return &Probe{ + Probe: probe, + logger: logger, + } +} + +func NewDefaultProbe(logger *zap.SugaredLogger, target string) *Probe { + targetURL, err := url.Parse(target) + if err != nil { + panic(fmt.Sprintf("failed to parse target URL: %v", err)) + } + + return &Probe{ + Probe: &kcore.Probe{ + Handler: kcore.Handler{ + TCPSocket: &kcore.TCPSocketAction{ + Port: intstr.FromString(targetURL.Port()), + Host: targetURL.Hostname(), + }, + }, + InitialDelaySeconds: _defaultInitialDelaySeconds, + TimeoutSeconds: _defaultTimeoutSeconds, + PeriodSeconds: _defaultPeriodSeconds, + SuccessThreshold: _defaultSuccessThreshold, + FailureThreshold: _defaultFailureThreshold, + }, + logger: logger, + } +} + +func (p *Probe) ProbeContainer() bool { + var err error + + switch { + case p.HTTPGet != nil: + err = p.httpProbe() + case p.TCPSocket != nil: + err = p.tcpProbe() + case p.Exec != nil: + // Should never be reachable. + p.logger.Error("exec probe not supported") + return false + default: + p.logger.Warn("no probe found") + return false + } + + if err != nil { + p.logger.Error(err) + return false + } + return true +} + +func (p *Probe) doProbe(probe func(time.Duration) error) error { + timeout := time.Duration(p.TimeoutSeconds) * time.Second + retryInterval := time.Duration(p.PeriodSeconds) * time.Second + + var failCount int + var lastProbeErr error + pollErr := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + if err := probe(timeout); err != nil { + // Reset count of consecutive successes to zero. + p.count = 0 + // Don't log this now since we probe every 50ms and some failures are + // expected if the user container takes longer than that to start up. + // We'll log the lastProbeErr if we don't eventually succeed. + lastProbeErr = err + failCount++ + return false, nil + } + + p.count++ + + // Return success if count of consecutive successes is equal to or greater + // than the probe's SuccessThreshold. + return p.count >= p.SuccessThreshold, nil + }) + + if pollErr != nil && lastProbeErr != nil { + p.logger.Warnf("probe error (failed %d times): %v", failCount, lastProbeErr) + } + + return pollErr +} + +func (p *Probe) httpProbe() error { + return p.doProbe(func(timeout time.Duration) error { + targetURL := s.EnsurePrefix( + net.JoinHostPort(p.HTTPGet.Host, p.HTTPGet.Port.String())+s.EnsurePrefix(p.HTTPGet.Path, "/"), + "http://", + ) + + httpClient := &http.Client{} + req, err := http.NewRequest(http.MethodGet, targetURL, nil) + if err != nil { + return err + } + + res, err := httpClient.Do(req) + if err != nil { + return err + } + + req.Header.Add(proxy.UserAgentKey, proxy.KubeProbeUserAgentPrefix) + + for _, header := range p.HTTPGet.HTTPHeaders { + req.Header.Add(header.Name, header.Value) + } + + defer func() { + // Ensure body is both read _and_ closed so it can be reused for keep-alive. + // No point handling errors, connection just won't be reused. + _, _ = io.Copy(ioutil.Discard, res.Body) + _ = res.Body.Close() + }() + + // response status code between 200-399 indicates success + if !(res.StatusCode >= 200 && res.StatusCode < 400) { + return fmt.Errorf("HTTP probe did not respond Ready, got status code: %d", res.StatusCode) + } + + return nil + }) +} + +func (p *Probe) tcpProbe() error { + return p.doProbe(func(timeout time.Duration) error { + address := net.JoinHostPort(p.TCPSocket.Host, p.TCPSocket.Port.String()) + conn, err := net.DialTimeout("tcp", address, timeout) + if err != nil { + return err + } + _ = conn.Close() + return nil + }) +} diff --git a/pkg/proxy/probe/probe_test.go b/pkg/proxy/probe/probe_test.go new file mode 100644 index 0000000000..5d22a43ef9 --- /dev/null +++ b/pkg/proxy/probe/probe_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/cortexlabs/cortex/pkg/proxy/probe" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func newLogger(t *testing.T) *zap.SugaredLogger { + t.Helper() + + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.FatalLevel) + logger, err := config.Build() + require.NoError(t, err) + + log := logger.Sugar() + + return log +} + +func TestDefaultProbeSuccess(t *testing.T) { + t.Parallel() + log := newLogger(t) + + var handler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + } + server := httptest.NewServer(handler) + pb := probe.NewDefaultProbe(log, server.URL) + + require.True(t, pb.ProbeContainer()) +} + +func TestDefaultProbeFailure(t *testing.T) { + t.Parallel() + log := newLogger(t) + + target := "http://127.0.0.1:12345" + pb := probe.NewDefaultProbe(log, target) + + require.False(t, pb.ProbeContainer()) +} + +func TestProbeHTTPFailure(t *testing.T) { + t.Parallel() + log := newLogger(t) + + pb := probe.NewProbe( + &kcore.Probe{ + Handler: kcore.Handler{ + HTTPGet: &kcore.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromString("12345"), + Host: "127.0.0.1", + }, + }, + InitialDelaySeconds: 0, + TimeoutSeconds: 3, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, log, + ) + + require.False(t, pb.ProbeContainer()) +} + +func TestProbeHTTPSuccess(t *testing.T) { + t.Parallel() + log := newLogger(t) + + var handler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + } + server := httptest.NewServer(handler) + targetURL, err := url.Parse(server.URL) + require.NoError(t, err) + + pb := probe.NewProbe( + &kcore.Probe{ + Handler: kcore.Handler{ + HTTPGet: &kcore.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromString(targetURL.Port()), + Host: targetURL.Hostname(), + }, + }, + InitialDelaySeconds: 0, + TimeoutSeconds: 3, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, log, + ) + + require.True(t, pb.ProbeContainer()) +} From 01994e360eecbee0785ee4df1beac930b99f4d18 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 18:22:24 +0100 Subject: [PATCH 02/15] Add probe server and handler --- cmd/proxy/main.go | 16 ++++++++++++---- pkg/proxy/probe/handler.go | 33 +++++++++++++++++++++++++++++++++ pkg/proxy/probe/probe.go | 19 ++++++++----------- 3 files changed, 53 insertions(+), 15 deletions(-) create mode 100644 pkg/proxy/probe/handler.go diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 0b0beea153..9510a27092 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -27,6 +27,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/logging" "github.com/cortexlabs/cortex/pkg/proxy" + "github.com/cortexlabs/cortex/pkg/proxy/probe" "go.uber.org/zap" ) @@ -39,13 +40,15 @@ func main() { var ( port int metricsPort int + probePort int userContainerPort int maxConcurrency int maxQueueLength int ) - flag.IntVar(&port, "port", 8000, "port where the proxy will be served") - flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the proxy will be served") + flag.IntVar(&port, "port", 8000, "port where the proxy server will be exposed") + flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the metrics server will be exposed") + flag.IntVar(&probePort, "probe-port", 8002, "port where the probe server will be exposed") flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to") flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container") flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container") @@ -63,7 +66,7 @@ func main() { maxQueueLength = maxConcurrency * 10 } - target := "http://127.0.0.1:" + strconv.Itoa(port) + target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort) httpProxy := proxy.NewReverseProxy(target, maxQueueLength, maxQueueLength) requestCounterStats := &proxy.RequestStats{} @@ -76,6 +79,7 @@ func main() { ) promStats := proxy.NewPrometheusStatsReporter() + readinessProbe := probe.NewDefaultProbe(log, target) // TODO: initialize custom probe from flags go func() { reportTicker := time.NewTicker(_reportInterval) @@ -101,13 +105,17 @@ func main() { servers := map[string]*http.Server{ "proxy": { - Addr: ":" + strconv.Itoa(userContainerPort), + Addr: ":" + strconv.Itoa(port), Handler: proxy.Handler(breaker, httpProxy), }, "metrics": { Addr: ":" + strconv.Itoa(metricsPort), Handler: promStats, }, + "probe": { + Addr: ":" + strconv.Itoa(probePort), + Handler: probe.Handler(readinessProbe), + }, } errCh := make(chan error) diff --git a/pkg/proxy/probe/handler.go b/pkg/proxy/probe/handler.go new file mode 100644 index 0000000000..55c89b5c58 --- /dev/null +++ b/pkg/proxy/probe/handler.go @@ -0,0 +1,33 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe + +import "net/http" + +func Handler(pb *Probe) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + healthy := pb.ProbeContainer() + if !healthy { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("unhealthy")) + return + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("healthy")) + } +} diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go index 9063c0c68d..f6280c7260 100644 --- a/pkg/proxy/probe/probe.go +++ b/pkg/proxy/probe/probe.go @@ -27,6 +27,7 @@ import ( s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/proxy" + "github.com/pkg/errors" "go.uber.org/zap" kcore "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -38,7 +39,7 @@ const ( _defaultTimeoutSeconds = 3 _defaultPeriodSeconds = 1 _defaultSuccessThreshold = 1 - _defaultFailureThreshold = 1 + _defaultFailureThreshold = 5 ) type Probe struct { @@ -96,7 +97,7 @@ func (p *Probe) ProbeContainer() bool { } if err != nil { - p.logger.Error(err) + p.logger.Warn(err) return false } return true @@ -107,16 +108,16 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error { retryInterval := time.Duration(p.PeriodSeconds) * time.Second var failCount int - var lastProbeErr error pollErr := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { if err := probe(timeout); err != nil { // Reset count of consecutive successes to zero. p.count = 0 - // Don't log this now since we probe every 50ms and some failures are - // expected if the user container takes longer than that to start up. - // We'll log the lastProbeErr if we don't eventually succeed. - lastProbeErr = err failCount++ + + if failCount >= int(p.FailureThreshold) { + return false, errors.Wrapf(err, "probe failure exceeded (failureThreshold = %d)", p.FailureThreshold) + } + return false, nil } @@ -127,10 +128,6 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error { return p.count >= p.SuccessThreshold, nil }) - if pollErr != nil && lastProbeErr != nil { - p.logger.Warnf("probe error (failed %d times): %v", failCount, lastProbeErr) - } - return pollErr } From 84b92827f100224aafe29ce7df679baa15e69fce Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 18:37:53 +0100 Subject: [PATCH 03/15] Add probe handler tests --- pkg/proxy/probe/handler_test.go | 63 +++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 pkg/proxy/probe/handler_test.go diff --git a/pkg/proxy/probe/handler_test.go b/pkg/proxy/probe/handler_test.go new file mode 100644 index 0000000000..1f2378c5e7 --- /dev/null +++ b/pkg/proxy/probe/handler_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/cortexlabs/cortex/pkg/proxy/probe" + "github.com/stretchr/testify/require" +) + +func TestHandlerFailure(t *testing.T) { + t.Parallel() + log := newLogger(t) + + pb := probe.NewDefaultProbe(log, "http://127.0.0.1:12345") + handler := probe.Handler(pb) + + r := httptest.NewRequest(http.MethodGet, "http://fake.cortex.dev/healthz", nil) + w := httptest.NewRecorder() + + handler(w, r) + + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "unhealthy", w.Body.String()) +} + +func TestHandlerSuccess(t *testing.T) { + t.Parallel() + log := newLogger(t) + + var userHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + } + server := httptest.NewServer(userHandler) + + pb := probe.NewDefaultProbe(log, server.URL) + handler := probe.Handler(pb) + + r := httptest.NewRequest(http.MethodGet, "http://fake.cortex.dev/healthz", nil) + w := httptest.NewRecorder() + + handler(w, r) + + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "healthy", w.Body.String()) +} From fec37b229ff482a5f448bf8e588b4c4dadf688bf Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 20:15:49 +0100 Subject: [PATCH 04/15] Ensure http probe headers are in the request --- pkg/proxy/probe/handler_test.go | 56 ++++++++++++++++++++++++++++++++- pkg/proxy/probe/probe.go | 10 +++--- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/pkg/proxy/probe/handler_test.go b/pkg/proxy/probe/handler_test.go index 1f2378c5e7..2b3243d693 100644 --- a/pkg/proxy/probe/handler_test.go +++ b/pkg/proxy/probe/handler_test.go @@ -19,10 +19,14 @@ package probe_test import ( "net/http" "net/http/httptest" + "net/url" "testing" + "github.com/cortexlabs/cortex/pkg/proxy" "github.com/cortexlabs/cortex/pkg/proxy/probe" "github.com/stretchr/testify/require" + kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) func TestHandlerFailure(t *testing.T) { @@ -41,7 +45,7 @@ func TestHandlerFailure(t *testing.T) { require.Equal(t, "unhealthy", w.Body.String()) } -func TestHandlerSuccess(t *testing.T) { +func TestHandlerSuccessTCP(t *testing.T) { t.Parallel() log := newLogger(t) @@ -61,3 +65,53 @@ func TestHandlerSuccess(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) require.Equal(t, "healthy", w.Body.String()) } + +func TestHandlerSuccessHTTP(t *testing.T) { + t.Parallel() + log := newLogger(t) + + headers := []kcore.HTTPHeader{ + { + Name: "X-Cortex-Blah", + Value: "Blah", + }, + } + + var userHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + require.Contains(t, r.Header.Get(proxy.UserAgentKey), proxy.KubeProbeUserAgentPrefix) + for _, header := range headers { + require.Equal(t, header.Value, r.Header.Get(header.Name)) + } + + w.WriteHeader(http.StatusOK) + } + server := httptest.NewServer(userHandler) + targetURL, err := url.Parse(server.URL) + require.NoError(t, err) + + pb := probe.NewProbe( + &kcore.Probe{ + Handler: kcore.Handler{ + HTTPGet: &kcore.HTTPGetAction{ + Path: "/", + Port: intstr.FromString(targetURL.Port()), + Host: targetURL.Hostname(), + HTTPHeaders: headers, + }, + }, + TimeoutSeconds: 3, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + }, log, + ) + handler := probe.Handler(pb) + + r := httptest.NewRequest(http.MethodGet, "http://fake.cortex.dev/healthz", nil) + w := httptest.NewRecorder() + + handler(w, r) + + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "healthy", w.Body.String()) +} diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go index f6280c7260..bcf39fd5bf 100644 --- a/pkg/proxy/probe/probe.go +++ b/pkg/proxy/probe/probe.go @@ -144,17 +144,17 @@ func (p *Probe) httpProbe() error { return err } - res, err := httpClient.Do(req) - if err != nil { - return err - } - req.Header.Add(proxy.UserAgentKey, proxy.KubeProbeUserAgentPrefix) for _, header := range p.HTTPGet.HTTPHeaders { req.Header.Add(header.Name, header.Value) } + res, err := httpClient.Do(req) + if err != nil { + return err + } + defer func() { // Ensure body is both read _and_ closed so it can be reused for keep-alive. // No point handling errors, connection just won't be reused. From cab10a472a3e91265aacf973aa6c90132458eb87 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 20:22:33 +0100 Subject: [PATCH 05/15] Make --max-queue-length flag required --- cmd/proxy/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 9510a27092..5a38370aed 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -63,7 +63,7 @@ func main() { case maxConcurrency == 0: log.Fatal("--max-concurrency flag is required") case maxQueueLength == 0: - maxQueueLength = maxConcurrency * 10 + log.Fatal("--max-queue-length flag is required") } target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort) From 5a1312cb9fa65ba416609dc86c903cc54dbf4d4a Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Tue, 18 May 2021 20:36:59 +0100 Subject: [PATCH 06/15] Add initial delay seconds parameter to probe --- pkg/proxy/probe/probe.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go index bcf39fd5bf..088a58013b 100644 --- a/pkg/proxy/probe/probe.go +++ b/pkg/proxy/probe/probe.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "net/url" + "sync" "time" s "github.com/cortexlabs/cortex/pkg/lib/strings" @@ -39,11 +40,13 @@ const ( _defaultTimeoutSeconds = 3 _defaultPeriodSeconds = 1 _defaultSuccessThreshold = 1 - _defaultFailureThreshold = 5 + _defaultFailureThreshold = 3 ) type Probe struct { *kcore.Probe + mu sync.RWMutex + hasRun bool count int32 logger *zap.SugaredLogger } @@ -107,6 +110,13 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error { timeout := time.Duration(p.TimeoutSeconds) * time.Second retryInterval := time.Duration(p.PeriodSeconds) * time.Second + if p.hasRun { + p.mu.Lock() + defer p.mu.Unlock() + time.Sleep(time.Duration(p.InitialDelaySeconds) * time.Second) + p.hasRun = true + } + var failCount int pollErr := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { if err := probe(timeout); err != nil { From 95b1e088369bfc2eececfebdb7af23830972a263 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 11:34:24 +0100 Subject: [PATCH 07/15] Remove retry logic from probe, since it will be off-handed to k8s --- pkg/proxy/probe/probe.go | 128 ++++++++++------------------------ pkg/proxy/probe/probe_test.go | 12 +--- 2 files changed, 40 insertions(+), 100 deletions(-) diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go index 088a58013b..37f625295a 100644 --- a/pkg/proxy/probe/probe.go +++ b/pkg/proxy/probe/probe.go @@ -23,31 +23,21 @@ import ( "net" "net/http" "net/url" - "sync" "time" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/proxy" - "github.com/pkg/errors" "go.uber.org/zap" kcore "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/wait" ) const ( - _defaultInitialDelaySeconds = 1 - _defaultTimeoutSeconds = 3 - _defaultPeriodSeconds = 1 - _defaultSuccessThreshold = 1 - _defaultFailureThreshold = 3 + _defaultTimeoutSeconds = 3 ) type Probe struct { *kcore.Probe - mu sync.RWMutex - hasRun bool - count int32 logger *zap.SugaredLogger } @@ -72,11 +62,7 @@ func NewDefaultProbe(logger *zap.SugaredLogger, target string) *Probe { Host: targetURL.Hostname(), }, }, - InitialDelaySeconds: _defaultInitialDelaySeconds, - TimeoutSeconds: _defaultTimeoutSeconds, - PeriodSeconds: _defaultPeriodSeconds, - SuccessThreshold: _defaultSuccessThreshold, - FailureThreshold: _defaultFailureThreshold, + TimeoutSeconds: _defaultTimeoutSeconds, }, logger: logger, } @@ -106,89 +92,51 @@ func (p *Probe) ProbeContainer() bool { return true } -func (p *Probe) doProbe(probe func(time.Duration) error) error { - timeout := time.Duration(p.TimeoutSeconds) * time.Second - retryInterval := time.Duration(p.PeriodSeconds) * time.Second +func (p *Probe) httpProbe() error { + targetURL := s.EnsurePrefix( + net.JoinHostPort(p.HTTPGet.Host, p.HTTPGet.Port.String())+s.EnsurePrefix(p.HTTPGet.Path, "/"), + "http://", + ) - if p.hasRun { - p.mu.Lock() - defer p.mu.Unlock() - time.Sleep(time.Duration(p.InitialDelaySeconds) * time.Second) - p.hasRun = true + httpClient := &http.Client{} + req, err := http.NewRequest(http.MethodGet, targetURL, nil) + if err != nil { + return err } - var failCount int - pollErr := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { - if err := probe(timeout); err != nil { - // Reset count of consecutive successes to zero. - p.count = 0 - failCount++ - - if failCount >= int(p.FailureThreshold) { - return false, errors.Wrapf(err, "probe failure exceeded (failureThreshold = %d)", p.FailureThreshold) - } + req.Header.Add(proxy.UserAgentKey, proxy.KubeProbeUserAgentPrefix) - return false, nil - } + for _, header := range p.HTTPGet.HTTPHeaders { + req.Header.Add(header.Name, header.Value) + } - p.count++ + res, err := httpClient.Do(req) + if err != nil { + return err + } - // Return success if count of consecutive successes is equal to or greater - // than the probe's SuccessThreshold. - return p.count >= p.SuccessThreshold, nil - }) + defer func() { + // Ensure body is both read _and_ closed so it can be reused for keep-alive. + // No point handling errors, connection just won't be reused. + _, _ = io.Copy(ioutil.Discard, res.Body) + _ = res.Body.Close() + }() - return pollErr -} + // response status code between 200-399 indicates success + if !(res.StatusCode >= 200 && res.StatusCode < 400) { + return fmt.Errorf("HTTP probe did not respond Ready, got status code: %d", res.StatusCode) + } -func (p *Probe) httpProbe() error { - return p.doProbe(func(timeout time.Duration) error { - targetURL := s.EnsurePrefix( - net.JoinHostPort(p.HTTPGet.Host, p.HTTPGet.Port.String())+s.EnsurePrefix(p.HTTPGet.Path, "/"), - "http://", - ) - - httpClient := &http.Client{} - req, err := http.NewRequest(http.MethodGet, targetURL, nil) - if err != nil { - return err - } - - req.Header.Add(proxy.UserAgentKey, proxy.KubeProbeUserAgentPrefix) - - for _, header := range p.HTTPGet.HTTPHeaders { - req.Header.Add(header.Name, header.Value) - } - - res, err := httpClient.Do(req) - if err != nil { - return err - } - - defer func() { - // Ensure body is both read _and_ closed so it can be reused for keep-alive. - // No point handling errors, connection just won't be reused. - _, _ = io.Copy(ioutil.Discard, res.Body) - _ = res.Body.Close() - }() - - // response status code between 200-399 indicates success - if !(res.StatusCode >= 200 && res.StatusCode < 400) { - return fmt.Errorf("HTTP probe did not respond Ready, got status code: %d", res.StatusCode) - } - - return nil - }) + return nil } func (p *Probe) tcpProbe() error { - return p.doProbe(func(timeout time.Duration) error { - address := net.JoinHostPort(p.TCPSocket.Host, p.TCPSocket.Port.String()) - conn, err := net.DialTimeout("tcp", address, timeout) - if err != nil { - return err - } - _ = conn.Close() - return nil - }) + timeout := time.Duration(p.TimeoutSeconds) * time.Second + address := net.JoinHostPort(p.TCPSocket.Host, p.TCPSocket.Port.String()) + conn, err := net.DialTimeout("tcp", address, timeout) + if err != nil { + return err + } + _ = conn.Close() + return nil } diff --git a/pkg/proxy/probe/probe_test.go b/pkg/proxy/probe/probe_test.go index 5d22a43ef9..281c7ea99e 100644 --- a/pkg/proxy/probe/probe_test.go +++ b/pkg/proxy/probe/probe_test.go @@ -78,11 +78,7 @@ func TestProbeHTTPFailure(t *testing.T) { Host: "127.0.0.1", }, }, - InitialDelaySeconds: 0, - TimeoutSeconds: 3, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, + TimeoutSeconds: 3, }, log, ) @@ -109,11 +105,7 @@ func TestProbeHTTPSuccess(t *testing.T) { Host: targetURL.Hostname(), }, }, - InitialDelaySeconds: 0, - TimeoutSeconds: 3, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, + TimeoutSeconds: 3, }, log, ) From f8dac37b89b22be43766553a28db0fe8a745d1f9 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 11:47:36 +0100 Subject: [PATCH 08/15] Add json encoding and decoding functions for k8s probes --- pkg/proxy/probe/encoding.go | 46 +++++++++++++++++ pkg/proxy/probe/encoding_test.go | 89 ++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 pkg/proxy/probe/encoding.go create mode 100644 pkg/proxy/probe/encoding_test.go diff --git a/pkg/proxy/probe/encoding.go b/pkg/proxy/probe/encoding.go new file mode 100644 index 0000000000..227f776f13 --- /dev/null +++ b/pkg/proxy/probe/encoding.go @@ -0,0 +1,46 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe + +import ( + "encoding/json" + "errors" + + kcore "k8s.io/api/core/v1" +) + +// DecodeProbe takes a json serialised *kcore.Probe and returns a Probe or an error. +func DecodeProbe(jsonProbe string) (*kcore.Probe, error) { + pb := &kcore.Probe{} + if err := json.Unmarshal([]byte(jsonProbe), pb); err != nil { + return nil, err + } + return pb, nil +} + +// EncodeProbe takes *kcore.Probe object and returns marshalled Probe JSON string and an error. +func EncodeProbe(pb *kcore.Probe) (string, error) { + if pb == nil { + return "", errors.New("cannot encode nil probe") + } + + probeJSON, err := json.Marshal(pb) + if err != nil { + return "", err + } + return string(probeJSON), nil +} diff --git a/pkg/proxy/probe/encoding_test.go b/pkg/proxy/probe/encoding_test.go new file mode 100644 index 0000000000..dc1b3f8459 --- /dev/null +++ b/pkg/proxy/probe/encoding_test.go @@ -0,0 +1,89 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestDecodeProbeSuccess(t *testing.T) { + t.Parallel() + + expectedProbe := &kcore.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 2, + SuccessThreshold: 1, + FailureThreshold: 1, + Handler: kcore.Handler{ + TCPSocket: &kcore.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8080"), + }, + }, + } + probeBytes, err := json.Marshal(expectedProbe) + require.NoError(t, err) + + gotProbe, err := DecodeProbe(string(probeBytes)) + require.NoError(t, err) + + require.Equal(t, expectedProbe, gotProbe) +} + +func TestDecodeProbeFailure(t *testing.T) { + t.Parallel() + + probeBytes, err := json.Marshal("blah") + require.NoError(t, err) + + _, err = DecodeProbe(string(probeBytes)) + require.Error(t, err) +} + +func TestEncodeProbe(t *testing.T) { + t.Parallel() + + probe := &kcore.Probe{ + SuccessThreshold: 1, + Handler: kcore.Handler{ + TCPSocket: &kcore.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8080"), + }, + }, + } + + jsonProbe, err := EncodeProbe(probe) + require.NoError(t, err) + + wantProbe := `{"tcpSocket":{"port":"8080","host":"127.0.0.1"},"successThreshold":1}` + require.Equal(t, wantProbe, jsonProbe) +} + +func TestEncodeNilProbe(t *testing.T) { + t.Parallel() + + jsonProbe, err := EncodeProbe(nil) + assert.Error(t, err) + assert.Empty(t, jsonProbe) +} From b44239f4df3254c8b51f488d33e4436be81e72fb Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 11:51:39 +0100 Subject: [PATCH 09/15] Rename encode/decode probe functions --- pkg/proxy/probe/encoding.go | 8 ++++---- pkg/proxy/probe/encoding_test.go | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/probe/encoding.go b/pkg/proxy/probe/encoding.go index 227f776f13..38363d1dc4 100644 --- a/pkg/proxy/probe/encoding.go +++ b/pkg/proxy/probe/encoding.go @@ -23,8 +23,8 @@ import ( kcore "k8s.io/api/core/v1" ) -// DecodeProbe takes a json serialised *kcore.Probe and returns a Probe or an error. -func DecodeProbe(jsonProbe string) (*kcore.Probe, error) { +// DecodeJSON takes a json serialised *kcore.Probe and returns a Probe or an error. +func DecodeJSON(jsonProbe string) (*kcore.Probe, error) { pb := &kcore.Probe{} if err := json.Unmarshal([]byte(jsonProbe), pb); err != nil { return nil, err @@ -32,8 +32,8 @@ func DecodeProbe(jsonProbe string) (*kcore.Probe, error) { return pb, nil } -// EncodeProbe takes *kcore.Probe object and returns marshalled Probe JSON string and an error. -func EncodeProbe(pb *kcore.Probe) (string, error) { +// EncodeJSON takes *kcore.Probe object and returns marshalled Probe JSON string and an error. +func EncodeJSON(pb *kcore.Probe) (string, error) { if pb == nil { return "", errors.New("cannot encode nil probe") } diff --git a/pkg/proxy/probe/encoding_test.go b/pkg/proxy/probe/encoding_test.go index dc1b3f8459..e48aa62165 100644 --- a/pkg/proxy/probe/encoding_test.go +++ b/pkg/proxy/probe/encoding_test.go @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package probe +package probe_test import ( "encoding/json" "testing" + "github.com/cortexlabs/cortex/pkg/proxy/probe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" kcore "k8s.io/api/core/v1" @@ -44,7 +45,7 @@ func TestDecodeProbeSuccess(t *testing.T) { probeBytes, err := json.Marshal(expectedProbe) require.NoError(t, err) - gotProbe, err := DecodeProbe(string(probeBytes)) + gotProbe, err := probe.DecodeJSON(string(probeBytes)) require.NoError(t, err) require.Equal(t, expectedProbe, gotProbe) @@ -56,14 +57,14 @@ func TestDecodeProbeFailure(t *testing.T) { probeBytes, err := json.Marshal("blah") require.NoError(t, err) - _, err = DecodeProbe(string(probeBytes)) + _, err = probe.DecodeJSON(string(probeBytes)) require.Error(t, err) } func TestEncodeProbe(t *testing.T) { t.Parallel() - probe := &kcore.Probe{ + pb := &kcore.Probe{ SuccessThreshold: 1, Handler: kcore.Handler{ TCPSocket: &kcore.TCPSocketAction{ @@ -73,7 +74,7 @@ func TestEncodeProbe(t *testing.T) { }, } - jsonProbe, err := EncodeProbe(probe) + jsonProbe, err := probe.EncodeJSON(pb) require.NoError(t, err) wantProbe := `{"tcpSocket":{"port":"8080","host":"127.0.0.1"},"successThreshold":1}` @@ -83,7 +84,7 @@ func TestEncodeProbe(t *testing.T) { func TestEncodeNilProbe(t *testing.T) { t.Parallel() - jsonProbe, err := EncodeProbe(nil) + jsonProbe, err := probe.EncodeJSON(nil) assert.Error(t, err) assert.Empty(t, jsonProbe) } From 245a5974d399846da22a51efe8ccc17fdb3dd6f5 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 15:57:49 +0100 Subject: [PATCH 10/15] Add custom probe flag to proxy entrypoint --- cmd/proxy/main.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 5a38370aed..b2715ec76a 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "io/ioutil" "net/http" "os" "os/signal" @@ -44,6 +45,7 @@ func main() { userContainerPort int maxConcurrency int maxQueueLength int + probeDefPath string ) flag.IntVar(&port, "port", 8000, "port where the proxy server will be exposed") @@ -52,6 +54,7 @@ func main() { flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to") flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container") flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container") + flag.StringVar(&probeDefPath, "probe", "", "path to the desired probe json definition") flag.Parse() log := logging.GetLogger() @@ -79,7 +82,21 @@ func main() { ) promStats := proxy.NewPrometheusStatsReporter() - readinessProbe := probe.NewDefaultProbe(log, target) // TODO: initialize custom probe from flags + + readinessProbe := probe.NewDefaultProbe(log, target) + if probeDefPath != "" { + jsonProbe, err := ioutil.ReadFile(probeDefPath) + if err != nil { + log.Fatal(err) + } + + probeDef, err := probe.DecodeJSON(string(jsonProbe)) + if err != nil { + log.Fatal(err) + } + + readinessProbe = probe.NewProbe(probeDef, log) + } go func() { reportTicker := time.NewTicker(_reportInterval) From 397451c45c82da56531f6be4643c73995dc2370b Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 16:15:28 +0100 Subject: [PATCH 11/15] Merge metrics and probe server into a single admin server --- cmd/proxy/main.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index b2715ec76a..a8d8321c5e 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -40,8 +40,7 @@ const ( func main() { var ( port int - metricsPort int - probePort int + adminPort int userContainerPort int maxConcurrency int maxQueueLength int @@ -49,8 +48,7 @@ func main() { ) flag.IntVar(&port, "port", 8000, "port where the proxy server will be exposed") - flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the metrics server will be exposed") - flag.IntVar(&probePort, "probe-port", 8002, "port where the probe server will be exposed") + flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server (for metrics and probes) will be exposed") flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to") flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container") flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container") @@ -120,18 +118,18 @@ func main() { } }() + adminHandler := http.NewServeMux() + adminHandler.Handle("/metrics", promStats) + adminHandler.Handle("/healthz", probe.Handler(readinessProbe)) + servers := map[string]*http.Server{ "proxy": { Addr: ":" + strconv.Itoa(port), Handler: proxy.Handler(breaker, httpProxy), }, - "metrics": { - Addr: ":" + strconv.Itoa(metricsPort), - Handler: promStats, - }, - "probe": { - Addr: ":" + strconv.Itoa(probePort), - Handler: probe.Handler(readinessProbe), + "admin": { + Addr: ":" + strconv.Itoa(adminPort), + Handler: adminHandler, }, } From e8ada724753052fc6556a83432b09130086fb565 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 18:08:28 +0100 Subject: [PATCH 12/15] Avoid creating 2 loggers --- cmd/proxy/main.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 05f4fe4523..4ca5d942a0 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -42,11 +42,7 @@ const ( _requestSampleInterval = 1 * time.Second ) -var ( - proxyLogger = logging.GetLogger() -) - -func Exit(err error, wrapStrs ...string) { +func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) { for _, str := range wrapStrs { err = errors.Wrap(err, str) } @@ -56,11 +52,10 @@ func Exit(err error, wrapStrs ...string) { } if err != nil && !errors.IsNoPrint(err) { - proxyLogger.Error(err) + log.Error(err) } telemetry.Close() - os.Exit(1) } @@ -100,17 +95,17 @@ func main() { clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath) if err != nil { - Exit(err) + exit(log, err) } awsClient, err := aws.NewForRegion(clusterConfig.Region) if err != nil { - Exit(err) + exit(log, err) } _, userID, err := awsClient.CheckCredentials() if err != nil { - Exit(err) + exit(log, err) } err = telemetry.Init(telemetry.Config{ @@ -125,7 +120,7 @@ func main() { BackoffMode: telemetry.BackoffDuplicateMessages, }) if err != nil { - Exit(err) + exit(log, err) } target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort) @@ -206,8 +201,8 @@ func main() { signal.Notify(sigint, os.Interrupt) select { - case err := <-errCh: - Exit(errors.Wrap(err, "failed to start proxy server")) + case err = <-errCh: + exit(log, errors.Wrap(err, "failed to start proxy server")) case <-sigint: // We received an interrupt signal, shut down. log.Info("Received TERM signal, handling a graceful shutdown...") From fa2174f14e95ef3c4941ccbef988f7c7f96575aa Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 18:09:02 +0100 Subject: [PATCH 13/15] Move exit function to bottom --- cmd/proxy/main.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 4ca5d942a0..e00a01a5cc 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -42,23 +42,6 @@ const ( _requestSampleInterval = 1 * time.Second ) -func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) { - for _, str := range wrapStrs { - err = errors.Wrap(err, str) - } - - if err != nil && !errors.IsNoTelemetry(err) { - telemetry.Error(err) - } - - if err != nil && !errors.IsNoPrint(err) { - log.Error(err) - } - - telemetry.Close() - os.Exit(1) -} - func main() { var ( port int @@ -219,3 +202,20 @@ func main() { telemetry.Close() } } + +func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) { + for _, str := range wrapStrs { + err = errors.Wrap(err, str) + } + + if err != nil && !errors.IsNoTelemetry(err) { + telemetry.Error(err) + } + + if err != nil && !errors.IsNoPrint(err) { + log.Error(err) + } + + telemetry.Close() + os.Exit(1) +} From 7adfa75c315ad37bf6c456c8d43b81505a8f774e Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 18:19:08 +0100 Subject: [PATCH 14/15] Address PR comments --- cmd/proxy/main.go | 4 +++- pkg/proxy/probe/handler_test.go | 4 ++-- pkg/proxy/probe/probe.go | 4 ++-- pkg/proxy/probe/probe_test.go | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index e00a01a5cc..9bc7cff7c5 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -120,7 +120,7 @@ func main() { promStats := proxy.NewPrometheusStatsReporter() - readinessProbe := probe.NewDefaultProbe(log, target) + var readinessProbe *probe.Probe if probeDefPath != "" { jsonProbe, err := ioutil.ReadFile(probeDefPath) if err != nil { @@ -133,6 +133,8 @@ func main() { } readinessProbe = probe.NewProbe(probeDef, log) + } else { + readinessProbe = probe.NewDefaultProbe(target, log) } go func() { diff --git a/pkg/proxy/probe/handler_test.go b/pkg/proxy/probe/handler_test.go index 2b3243d693..da2db383f9 100644 --- a/pkg/proxy/probe/handler_test.go +++ b/pkg/proxy/probe/handler_test.go @@ -33,7 +33,7 @@ func TestHandlerFailure(t *testing.T) { t.Parallel() log := newLogger(t) - pb := probe.NewDefaultProbe(log, "http://127.0.0.1:12345") + pb := probe.NewDefaultProbe("http://127.0.0.1:12345", log) handler := probe.Handler(pb) r := httptest.NewRequest(http.MethodGet, "http://fake.cortex.dev/healthz", nil) @@ -54,7 +54,7 @@ func TestHandlerSuccessTCP(t *testing.T) { } server := httptest.NewServer(userHandler) - pb := probe.NewDefaultProbe(log, server.URL) + pb := probe.NewDefaultProbe(server.URL, log) handler := probe.Handler(pb) r := httptest.NewRequest(http.MethodGet, "http://fake.cortex.dev/healthz", nil) diff --git a/pkg/proxy/probe/probe.go b/pkg/proxy/probe/probe.go index 37f625295a..3eeee7a9f9 100644 --- a/pkg/proxy/probe/probe.go +++ b/pkg/proxy/probe/probe.go @@ -33,7 +33,7 @@ import ( ) const ( - _defaultTimeoutSeconds = 3 + _defaultTimeoutSeconds = 1 ) type Probe struct { @@ -48,7 +48,7 @@ func NewProbe(probe *kcore.Probe, logger *zap.SugaredLogger) *Probe { } } -func NewDefaultProbe(logger *zap.SugaredLogger, target string) *Probe { +func NewDefaultProbe(target string, logger *zap.SugaredLogger) *Probe { targetURL, err := url.Parse(target) if err != nil { panic(fmt.Sprintf("failed to parse target URL: %v", err)) diff --git a/pkg/proxy/probe/probe_test.go b/pkg/proxy/probe/probe_test.go index 281c7ea99e..b0518396f3 100644 --- a/pkg/proxy/probe/probe_test.go +++ b/pkg/proxy/probe/probe_test.go @@ -50,7 +50,7 @@ func TestDefaultProbeSuccess(t *testing.T) { w.WriteHeader(http.StatusOK) } server := httptest.NewServer(handler) - pb := probe.NewDefaultProbe(log, server.URL) + pb := probe.NewDefaultProbe(server.URL, log) require.True(t, pb.ProbeContainer()) } @@ -60,7 +60,7 @@ func TestDefaultProbeFailure(t *testing.T) { log := newLogger(t) target := "http://127.0.0.1:12345" - pb := probe.NewDefaultProbe(log, target) + pb := probe.NewDefaultProbe(target, log) require.False(t, pb.ProbeContainer()) } From 6e59d58913c1ebf00e207ec11b51a1823d8dc808 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Wed, 19 May 2021 18:25:30 +0100 Subject: [PATCH 15/15] Remove unnecessary const --- pkg/proxy/consts.go | 5 ----- pkg/proxy/handler.go | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/proxy/consts.go b/pkg/proxy/consts.go index cf0576ae38..67bb86f7fc 100644 --- a/pkg/proxy/consts.go +++ b/pkg/proxy/consts.go @@ -24,9 +24,4 @@ const ( // Since K8s 1.8, prober requests have // User-Agent = "kube-probe/{major-version}.{minor-version}". KubeProbeUserAgentPrefix = "kube-probe/" - - // KubeletProbeHeaderName is the header name to augment the probes, because - // Istio with mTLS rewrites probes, but their probes pass a different - // user-agent. - KubeletProbeHeaderName = "K-Kubelet-Probe" ) diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index 30a9171b04..39ba5f0b6f 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -43,6 +43,5 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc { } func isKubeletProbe(r *http.Request) bool { - return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUserAgentPrefix) || - r.Header.Get(KubeletProbeHeaderName) != "" + return strings.HasPrefix(r.Header.Get(UserAgentKey), KubeProbeUserAgentPrefix) }