diff --git a/cli/cmd/lib_traffic_splitters.go b/cli/cmd/lib_traffic_splitters.go index 690e047732..37864ea5c9 100644 --- a/cli/cmd/lib_traffic_splitters.go +++ b/cli/cmd/lib_traffic_splitters.go @@ -73,9 +73,14 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ apiRes := apisRes[0] lastUpdated := time.Unix(apiRes.Spec.LastUpdated, 0) + + apiName := apiRes.Spec.Name + if api.Shadow { + apiName += " (shadow)" + } rows = append(rows, []interface{}{ env.Name, - apiRes.Spec.Name, + apiName, api.Weight, apiRes.Status.Message(), apiRes.Status.Requested, @@ -108,7 +113,11 @@ func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []s lastUpdated := time.Unix(splitAPI.Spec.LastUpdated, 0) var apis []string for _, api := range splitAPI.Spec.APIs { - apis = append(apis, api.Name+":"+s.Int32(api.Weight)) + apiName := api.Name + if api.Shadow { + apiName += " (shadow)" + } + apis = append(apis, apiName+":"+s.Int32(api.Weight)) } apisStr := s.TruncateEllipses(strings.Join(apis, " "), 50) rows = append(rows, []interface{}{ diff --git a/docs/workloads/realtime/traffic-splitter/configuration.md b/docs/workloads/realtime/traffic-splitter/configuration.md index 2111153b75..a498a569a4 100644 --- a/docs/workloads/realtime/traffic-splitter/configuration.md +++ b/docs/workloads/realtime/traffic-splitter/configuration.md @@ -7,5 +7,6 @@ endpoint: # the endpoint for the Traffic Splitter (default: ) apis: # list of Realtime APIs to target - name: # name of a Realtime API that is already running or is included in the same configuration file (required) - weight: # percentage of traffic to route to the Realtime API (all weights must sum to 100) (required) + weight: # percentage of traffic to route to the Realtime API (all non-shadow weights must sum to 100) (required) + shadow: # duplicate incoming traffic and send fire-and-forget to this api (only one shadow per traffic splitter) (default: false) ``` diff --git a/pkg/lib/k8s/virtual_service.go b/pkg/lib/k8s/virtual_service.go index 226ec0547c..40f1cc6137 100644 --- a/pkg/lib/k8s/virtual_service.go +++ b/pkg/lib/k8s/virtual_service.go @@ -50,20 +50,34 @@ type Destination struct { ServiceName string Weight int32 Port uint32 + Shadow bool } func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualService { destinations := []*istionetworking.HTTPRouteDestination{} + var mirror *istionetworking.Destination + var mirrorWeight *istionetworking.Percent + for _, destination := range spec.Destinations { - destinations = append(destinations, &istionetworking.HTTPRouteDestination{ - Destination: &istionetworking.Destination{ + if destination.Shadow { + mirror = &istionetworking.Destination{ Host: destination.ServiceName, Port: &istionetworking.PortSelector{ Number: destination.Port, }, - }, - Weight: destination.Weight, - }) + } + mirrorWeight = &istionetworking.Percent{Value: float64(destination.Weight)} + } else { + destinations = append(destinations, &istionetworking.HTTPRouteDestination{ + Destination: &istionetworking.Destination{ + Host: destination.ServiceName, + Port: &istionetworking.PortSelector{ + Number: destination.Port, + }, + }, + Weight: destination.Weight, + }) + } } var httpRoutes []*istionetworking.HTTPRoute @@ -79,7 +93,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ }, }, }, - Route: destinations, + Route: destinations, + Mirror: mirror, + MirrorPercentage: mirrorWeight, }) if spec.Rewrite != nil { @@ -98,7 +114,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ }, }, }, - Route: destinations, + Route: destinations, + Mirror: mirror, + MirrorPercentage: mirrorWeight, } prefixMatch := &istionetworking.HTTPRoute{ @@ -111,7 +129,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ }, }, }, - Route: destinations, + Route: destinations, + Mirror: mirror, + MirrorPercentage: mirrorWeight, } if spec.Rewrite != nil { diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index bc605e24f3..801431b22d 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -112,6 +112,7 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination ServiceName: operator.K8sName(api.Name), Weight: api.Weight, Port: uint32(_defaultPortInt32), + Shadow: api.Shadow, } } return destinations diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index dbb101cfc5..52522476eb 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -91,6 +91,7 @@ const ( ErrConcurrencyMismatchServerSideBatchingPython = "spec.concurrency_mismatch_server_side_batching_python" ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight" ErrTrafficSplitterAPIsNotUnique = "spec.traffic_splitter_apis_not_unique" + ErrOneShadowPerTrafficSplitter = "spec.one_shadow_per_traffic_splitter" ErrUnexpectedDockerSecretData = "spec.unexpected_docker_secret_data" ) @@ -593,7 +594,7 @@ func ErrorConcurrencyMismatchServerSideBatchingPython(maxBatchsize int32, thread func ErrorIncorrectTrafficSplitterWeightTotal(totalWeight int32) error { return errors.WithStack(&errors.Error{ Kind: ErrIncorrectTrafficSplitterWeight, - Message: fmt.Sprintf("expected weights to sum to 100 but found %d", totalWeight), + Message: fmt.Sprintf("expected weights of all non-shadow apis to sum to 100 but found %d", totalWeight), }) } @@ -604,6 +605,13 @@ func ErrorTrafficSplitterAPIsNotUnique(names []string) error { }) } +func ErrorOneShadowPerTrafficSplitter() error { + return errors.WithStack(&errors.Error{ + Kind: ErrOneShadowPerTrafficSplitter, + Message: fmt.Sprintf("multiple shadow apis detected; only one api is allowed to be marked as a shadow"), + }) +} + var _pwRegex = regexp.MustCompile(`"password":"[^"]+"`) var _authRegex = regexp.MustCompile(`"auth":"[^"]+"`) diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index d5b9ffc7cc..e466371c28 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -512,7 +512,9 @@ func getModelVersionsFromPaths(paths []string, prefix string) []string { func verifyTotalWeight(apis []*userconfig.TrafficSplit) error { totalWeight := int32(0) for _, api := range apis { - totalWeight += api.Weight + if !api.Shadow { + totalWeight += api.Weight + } } if totalWeight == 100 { return nil diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index f6ca6f04a4..3c476ffdb3 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -131,6 +131,10 @@ func multiAPIsValidation() *cr.StructFieldValidation { LessThanOrEqualTo: pointer.Int32(100), }, }, + { + StructField: "Shadow", + BoolValidation: &cr.BoolValidation{}, + }, }, }, }, @@ -846,6 +850,16 @@ func ValidateTrafficSplitter(api *userconfig.API) error { return err } + hasShadow := false + for _, api := range api.APIs { + if api.Shadow { + if hasShadow { + return ErrorOneShadowPerTrafficSplitter() + } + hasShadow = true + } + } + return nil } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index bfb627461e..238a85bd79 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -86,6 +86,7 @@ type MultiModels struct { type TrafficSplit struct { Name string `json:"name" yaml:"name"` Weight int32 `json:"weight" yaml:"weight"` + Shadow bool `json:"shadow" yaml:"shadow"` } type ModelResource struct { @@ -386,6 +387,7 @@ func (trafficSplit *TrafficSplit) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", NameKey, trafficSplit.Name)) sb.WriteString(fmt.Sprintf("%s: %s\n", WeightKey, s.Int32(trafficSplit.Weight))) + sb.WriteString(fmt.Sprintf("%s: %s\n", ShadowKey, s.Bool(trafficSplit.Shadow))) return sb.String() } diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 3e9be70846..7ba49e631d 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -30,6 +30,7 @@ const ( // TrafficSplitter APIsKey = "apis" WeightKey = "weight" + ShadowKey = "shadow" // Predictor TypeKey = "type" diff --git a/test/apis/traffic-splitter/cortex.yaml b/test/apis/traffic-splitter/cortex.yaml index d11ecadde5..1587048c20 100644 --- a/test/apis/traffic-splitter/cortex.yaml +++ b/test/apis/traffic-splitter/cortex.yaml @@ -14,6 +14,12 @@ models: path: s3://cortex-examples/onnx/iris-classifier/ +- name: request-recorder + kind: RealtimeAPI + predictor: + type: python + path: request_recorder.py + - name: iris-classifier kind: TrafficSplitter apis: @@ -21,3 +27,6 @@ weight: 30 - name: iris-classifier-pytorch weight: 70 + - name: request-recorder + shadow: true + weight: 100 diff --git a/test/apis/traffic-splitter/request_recorder.py b/test/apis/traffic-splitter/request_recorder.py new file mode 100644 index 0000000000..8a732a5559 --- /dev/null +++ b/test/apis/traffic-splitter/request_recorder.py @@ -0,0 +1,10 @@ +from cortex_internal.lib.log import logger as cortex_logger + + +class PythonPredictor: + def __init__(self, config): + pass + + def predict(self, payload): + cortex_logger.info("received payload", extra={"payload": payload}) + return payload