diff --git a/async-gateway/service.go b/async-gateway/service.go index 2a8c9cd7b6..7e634c20c8 100644 --- a/async-gateway/service.go +++ b/async-gateway/service.go @@ -66,7 +66,7 @@ func (s *service) CreateWorkload(id string, payload io.Reader, contentType strin return "", err } - statusPath := fmt.Sprintf("%s/%s/%s", prefix, id, StatusInQueue) + statusPath := fmt.Sprintf("%s/%s/status/%s", prefix, id, StatusInQueue) log.Debug(fmt.Sprintf("setting status to %s", StatusInQueue)) if err := s.storage.Upload(statusPath, strings.NewReader(""), "text/plain"); err != nil { return "", err @@ -124,8 +124,8 @@ func (s *service) getStatus(id string) (Status, error) { log := s.logger.With(zap.String("id", id)) // download workload status - log.Debug("checking status", zap.String("path", fmt.Sprintf("%s/%s/*", prefix, id))) - files, err := s.storage.List(fmt.Sprintf("%s/%s", prefix, id)) + log.Debug("checking status", zap.String("path", fmt.Sprintf("%s/%s/status/*", prefix, id))) + files, err := s.storage.List(fmt.Sprintf("%s/%s/status", prefix, id)) if err != nil { return "", err } @@ -134,7 +134,6 @@ func (s *service) getStatus(id string) (Status, error) { status := StatusInQueue for _, file := range files { fileStatus := Status(file) - if !fileStatus.Valid() { status = fileStatus return "", fmt.Errorf("invalid workload status: %s", status) diff --git a/pkg/cortex/serve/cortex_internal/lib/api/async_api.py b/pkg/cortex/serve/cortex_internal/lib/api/async_api.py index e59f21cb1a..62f2009528 100644 --- a/pkg/cortex/serve/cortex_internal/lib/api/async_api.py +++ b/pkg/cortex/serve/cortex_internal/lib/api/async_api.py @@ -94,7 +94,7 @@ def statsd(self): return self.__statsd def update_status(self, request_id: str, status: str): - self.storage.put_str("", f"{self.storage_path}/{request_id}/{status}") + self.storage.put_str("", f"{self.storage_path}/{request_id}/status/{status}") def upload_result(self, request_id: str, result: Dict[str, Any]): if not isinstance(result, dict): diff --git a/test/e2e/e2e/utils.py b/test/e2e/e2e/utils.py index 96d9f34722..adbe61457c 100644 --- a/test/e2e/e2e/utils.py +++ b/test/e2e/e2e/utils.py @@ -331,6 +331,12 @@ def _retriever(request_id: str): result_response = session.get(f"{api_info['endpoint']}/{request_id}") if result_response.status_code != HTTPStatus.OK: + content = result_response.content.decode("utf-8") + if "error" in content: + event_stopper.set() + raise RuntimeError( + f"received {result_response.status_code} status code with the following message: {content}" + ) time.sleep(poll_sleep_seconds) continue