From baec44249e0d3981dc7d659c02aa47ac78e508d2 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 20 Apr 2021 16:40:15 +0300 Subject: [PATCH 1/3] Fix getStatus function for async gateway --- async-gateway/service.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/async-gateway/service.go b/async-gateway/service.go index 2a8c9cd7b6..305ef4c466 100644 --- a/async-gateway/service.go +++ b/async-gateway/service.go @@ -133,8 +133,11 @@ func (s *service) getStatus(id string) (Status, error) { // determine request status status := StatusInQueue for _, file := range files { - fileStatus := Status(file) + if file == "payload" { + continue + } + fileStatus := Status(file) if !fileStatus.Valid() { status = fileStatus return "", fmt.Errorf("invalid workload status: %s", status) From 6397feb5ca0f9a4414d29eaca93887441dfb9731 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 20 Apr 2021 17:13:01 +0300 Subject: [PATCH 2/3] Include "results.json" to if statement + check for error in test --- async-gateway/service.go | 2 +- test/e2e/e2e/utils.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/async-gateway/service.go b/async-gateway/service.go index 305ef4c466..6741591ccd 100644 --- a/async-gateway/service.go +++ b/async-gateway/service.go @@ -133,7 +133,7 @@ func (s *service) getStatus(id string) (Status, error) { // determine request status status := StatusInQueue for _, file := range files { - if file == "payload" { + if file == "payload" || file == "result.json" { continue } diff --git a/test/e2e/e2e/utils.py b/test/e2e/e2e/utils.py index 96d9f34722..adbe61457c 100644 --- a/test/e2e/e2e/utils.py +++ b/test/e2e/e2e/utils.py @@ -331,6 +331,12 @@ def _retriever(request_id: str): result_response = session.get(f"{api_info['endpoint']}/{request_id}") if result_response.status_code != HTTPStatus.OK: + content = result_response.content.decode("utf-8") + if "error" in content: + event_stopper.set() + raise RuntimeError( + f"received {result_response.status_code} status code with the following message: {content}" + ) time.sleep(poll_sleep_seconds) continue From 19956cff7d61ac6a3c0b3c46f2030944201aafb3 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 21 Apr 2021 01:37:10 +0300 Subject: [PATCH 3/3] Address PR comment --- async-gateway/service.go | 10 +++------- pkg/cortex/serve/cortex_internal/lib/api/async_api.py | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/async-gateway/service.go b/async-gateway/service.go index 6741591ccd..7e634c20c8 100644 --- a/async-gateway/service.go +++ b/async-gateway/service.go @@ -66,7 +66,7 @@ func (s *service) CreateWorkload(id string, payload io.Reader, contentType strin return "", err } - statusPath := fmt.Sprintf("%s/%s/%s", prefix, id, StatusInQueue) + statusPath := fmt.Sprintf("%s/%s/status/%s", prefix, id, StatusInQueue) log.Debug(fmt.Sprintf("setting status to %s", StatusInQueue)) if err := s.storage.Upload(statusPath, strings.NewReader(""), "text/plain"); err != nil { return "", err @@ -124,8 +124,8 @@ func (s *service) getStatus(id string) (Status, error) { log := s.logger.With(zap.String("id", id)) // download workload status - log.Debug("checking status", zap.String("path", fmt.Sprintf("%s/%s/*", prefix, id))) - files, err := s.storage.List(fmt.Sprintf("%s/%s", prefix, id)) + log.Debug("checking status", zap.String("path", fmt.Sprintf("%s/%s/status/*", prefix, id))) + files, err := s.storage.List(fmt.Sprintf("%s/%s/status", prefix, id)) if err != nil { return "", err } @@ -133,10 +133,6 @@ func (s *service) getStatus(id string) (Status, error) { // determine request status status := StatusInQueue for _, file := range files { - if file == "payload" || file == "result.json" { - continue - } - fileStatus := Status(file) if !fileStatus.Valid() { status = fileStatus diff --git a/pkg/cortex/serve/cortex_internal/lib/api/async_api.py b/pkg/cortex/serve/cortex_internal/lib/api/async_api.py index e59f21cb1a..62f2009528 100644 --- a/pkg/cortex/serve/cortex_internal/lib/api/async_api.py +++ b/pkg/cortex/serve/cortex_internal/lib/api/async_api.py @@ -94,7 +94,7 @@ def statsd(self): return self.__statsd def update_status(self, request_id: str, status: str): - self.storage.put_str("", f"{self.storage_path}/{request_id}/{status}") + self.storage.put_str("", f"{self.storage_path}/{request_id}/status/{status}") def upload_result(self, request_id: str, result: Dict[str, Any]): if not isinstance(result, dict):