Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This changelog keeps track of work items that have been completed and are ready

### New

- **General**: Support portName in HTTPScaledObject service scaleTargetRef ([#1174](https://github.com/kedacore/http-add-on/issues/1174))
- **General**: Support setting multiple TLS certs for different domains on the interceptor proxy ([#1116](https://github.com/kedacore/http-add-on/issues/1116))
- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO))

Expand Down
12 changes: 9 additions & 3 deletions config/crd/bases/http.keda.sh_httpscaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ spec:
type: integer
type: object
scaleTargetRef:
description: The name of the deployment to route HTTP requests to
(and to autoscale).
description: |-
The name of the deployment to route HTTP requests to (and to autoscale).
Including validation as a requirement to define either the PortName or the Port
properties:
apiVersion:
type: string
Expand All @@ -106,13 +107,18 @@ spec:
description: The port to route to
format: int32
type: integer
portName:
description: The port to route to referenced by name
type: string
service:
description: The name of the service to route to
type: string
required:
- port
- service
type: object
x-kubernetes-validations:
- message: must define either the 'portName' or the 'port'
rule: has(self.portName) != has(self.port)
scaledownPeriod:
description: (optional) Cooldown period value
format: int32
Expand Down
8 changes: 8 additions & 0 deletions config/interceptor/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
verbs:
- get
- list
- watch
Comment on lines +15 to +22
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

highlighting that newly services RBAC will be necessary for the interceptor

- apiGroups:
- http.keda.sh
resources:
Expand Down
21 changes: 14 additions & 7 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand All @@ -42,6 +43,7 @@ var (

// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch

func main() {
timeoutCfg := config.MustParseTimeouts()
Expand Down Expand Up @@ -85,11 +87,10 @@ func main() {
setupLog.Error(err, "creating new Kubernetes ClientSet")
os.Exit(1)
}
endpointsCache := k8s.NewInformerBackedEndpointsCache(
ctrl.Log,
cl,
time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS),
)

k8sSharedInformerFactory := k8sinformers.NewSharedInformerFactory(cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
svcCache := k8s.NewInformerBackedServiceCache(ctrl.Log, cl, k8sSharedInformerFactory)
endpointsCache := k8s.NewInformerBackedEndpointsCache(ctrl.Log, cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
if err != nil {
setupLog.Error(err, "creating new endpoints cache")
os.Exit(1)
Expand Down Expand Up @@ -123,6 +124,7 @@ func main() {
setupLog.Info("starting the endpoints cache")

endpointsCache.Start(ctx)
k8sSharedInformerFactory.Start(ctx.Done())
return nil
})

Expand Down Expand Up @@ -173,10 +175,11 @@ func main() {
eg.Go(func() error {
proxyTLSConfig := map[string]string{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath, "certstorePaths": servingCfg.TLSCertStorePaths}
proxyTLSPort := servingCfg.TLSPort
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())

setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort)

if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
setupLog.Error(err, "tls proxy server failed")
return err
}
Expand All @@ -186,9 +189,11 @@ func main() {

// start a proxy server without TLS.
eg.Go(func() error {
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort)

if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
setupLog.Error(err, "proxy server failed")
return err
}
Expand Down Expand Up @@ -369,6 +374,7 @@ func runProxyServer(
q queue.Counter,
waitFunc forwardWaitFunc,
routingTable routing.Table,
svcCache k8s.ServiceCache,
timeouts *config.Timeouts,
port int,
tlsEnabled bool,
Expand Down Expand Up @@ -416,6 +422,7 @@ func runProxyServer(
routingTable,
probeHandler,
upstreamHandler,
svcCache,
tlsEnabled,
)
rootHandler = middleware.NewLogging(
Expand Down
6 changes: 6 additions & 0 deletions interceptor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
svcCache := k8s.NewFakeServiceCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -77,6 +78,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
svcCache,
timeouts,
port,
false,
Expand Down Expand Up @@ -194,6 +196,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
svcCache := k8s.NewFakeServiceCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -209,6 +212,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
svcCache,
timeouts,
port,
true,
Expand Down Expand Up @@ -339,6 +343,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
svcCache := k8s.NewFakeServiceCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -354,6 +359,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
svcCache,
timeouts,
port,
true,
Expand Down
37 changes: 32 additions & 5 deletions interceptor/middleware/routing.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package middleware

import (
"context"
"fmt"
"net/http"
"net/url"
"regexp"

"github.com/kedacore/http-add-on/interceptor/handler"
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/kedacore/http-add-on/pkg/util"
)
Expand All @@ -21,14 +23,16 @@ type Routing struct {
routingTable routing.Table
probeHandler http.Handler
upstreamHandler http.Handler
svcCache k8s.ServiceCache
tlsEnabled bool
}

func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, tlsEnabled bool) *Routing {
func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, svcCache k8s.ServiceCache, tlsEnabled bool) *Routing {
return &Routing{
routingTable: routingTable,
probeHandler: probeHandler,
upstreamHandler: upstreamHandler,
svcCache: svcCache,
tlsEnabled: tlsEnabled,
}
}
Expand All @@ -52,7 +56,7 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))

stream, err := rm.streamFromHTTPSO(httpso)
stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
if err != nil {
sh := handler.NewStatic(http.StatusInternalServerError, err)
sh.ServeHTTP(w, r)
Expand All @@ -64,21 +68,44 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rm.upstreamHandler.ServeHTTP(w, r)
}

func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
if httpso.Spec.ScaleTargetRef.Port != 0 {
return httpso.Spec.ScaleTargetRef.Port, nil
}
if httpso.Spec.ScaleTargetRef.PortName == "" {
return 0, fmt.Errorf(`must specify either "port" or "portName"`)
}
svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
if err != nil {
return 0, fmt.Errorf("failed to get Service: %w", err)
}
for _, port := range svc.Spec.Ports {
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
return port.Port, nil
}
}
return 0, fmt.Errorf("portName %q not found in Service", httpso.Spec.ScaleTargetRef.PortName)
}

func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
port, err := rm.getPort(ctx, httpso)
if err != nil {
return nil, fmt.Errorf("failed to get port: %w", err)
}
if rm.tlsEnabled {
return url.Parse(fmt.Sprintf(
"https://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
httpso.Spec.ScaleTargetRef.Port,
port,
))
}
//goland:noinspection HttpUrlsUsage
return url.Parse(fmt.Sprintf(
"http://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
httpso.Spec.ScaleTargetRef.Port,
port,
))
}

Expand Down
Loading
Loading