diff --git a/cli/cmd/get.go b/cli/cmd/get.go index 40a61fe133..8905d90006 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -542,6 +542,10 @@ func dataResourceTable(resources []context.Resource, dataStatuses map[string]*re func apiResourceTable(apiGroupStatuses map[string]*resource.APIGroupStatus) string { rows := make([][]interface{}, 0, len(apiGroupStatuses)) for name, groupStatus := range apiGroupStatuses { + if groupStatus.Requested == 0 { + continue + } + var updatedAt *time.Time if groupStatus.ActiveStatus != nil { updatedAt = groupStatus.ActiveStatus.Start diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 10a4333540..f0e7b9a51a 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -18,6 +18,8 @@ package workloads import ( "bufio" + "bytes" + "encoding/json" "fmt" "io" "os" @@ -26,19 +28,22 @@ import ( "time" "github.com/gorilla/websocket" - kcore "k8s.io/api/core/v1" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/operator/config" ) const ( - writeWait = 10 * time.Second - closeGracePeriod = 10 * time.Second - maxMessageSize = 8192 + writeWait = 10 * time.Second + closeGracePeriod = 10 * time.Second + maxMessageSize = 8192 + podCheckInterval = 5 * time.Second + maxParallelPodLogging = 5 + initLogTailLines = 20 ) func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket.Conn) { @@ -56,40 +61,46 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket } if len(pods) > 0 { - if len(pods) > 1 { - if !writeSocket(fmt.Sprintf("%d pods available, streaming logs for one of them:", len(pods)), socket) { + if len(pods) > maxParallelPodLogging { + if !writeSocket(fmt.Sprintf("\n%d pods available, streaming logs for %d of them:", len(pods), maxParallelPodLogging), socket) { return } } podMap := make(map[k8s.PodStatus][]kcore.Pod) for _, pod := range pods { - podMap[k8s.GetPodStatus(&pod)] = append(podMap[k8s.GetPodStatus(&pod)], pod) + podStatus := k8s.GetPodStatus(&pod) + if len(podMap[podStatus]) < maxParallelPodLogging { + podMap[podStatus] = append(podMap[podStatus], pod) + } } switch { case len(podMap[k8s.PodStatusSucceeded]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusSucceeded][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusRunning]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusRunning][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusPending]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusPending][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusKilled]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusKilled][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusKilledOOM]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusKilledOOM][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusFailed]) > 0: previous := false if pods[0].Labels["workloadType"] == WorkloadTypeAPI { previous = true } - getKubectlLogs(&podMap[k8s.PodStatusFailed][0], verbose, wrotePending, previous, socket) + getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, previous, socket) case len(podMap[k8s.PodStatusTerminating]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusTerminating][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, false, socket) case len(podMap[k8s.PodStatusUnknown]) > 0: - getKubectlLogs(&podMap[k8s.PodStatusUnknown][0], verbose, wrotePending, false, socket) + getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, false, socket) default: // unexpected - getKubectlLogs(&pods[0], verbose, wrotePending, false, socket) + if len(pods) > maxParallelPodLogging { + pods = pods[:maxParallelPodLogging] + } + getKubectlLogs(pods, verbose, wrotePending, false, socket) } return } @@ -118,7 +129,7 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket if !writeSocket("\nFailed to start:\n", socket) { return } - getKubectlLogs(failedArgoPod, true, false, false, socket) + getKubectlLogs([]kcore.Pod{*failedArgoPod}, true, false, false, socket) return } @@ -133,52 +144,169 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket } } -func getKubectlLogs(pod *kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) { - cmdPath := "/usr/local/bin/kubectl" +func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) { + isAllPending := true + for _, pod := range pods { + if k8s.GetPodStatus(&pod) != k8s.PodStatusPending { + isAllPending = false + break + } + } - if k8s.GetPodStatus(pod) == k8s.PodStatusPending { - if !wrotePending { - if !writeSocket("\nPending", socket) { - return - } + if isAllPending { + if !writeSocket("\nPending", socket) { + return } - config.Kubernetes.WaitForPodRunning(pod.Name, 1) } - args := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"} + inr, inw, err := os.Pipe() + if err != nil { + errors.Panic(err, "logs", "kubectl", "os.pipe") + } + defer inw.Close() + defer inr.Close() + + podCheckCancel := make(chan struct{}) + defer close(podCheckCancel) + + go podCheck(podCheckCancel, socket, pods, previous, verbose, inr) + pumpStdin(socket, inw) + podCheckCancel <- struct{}{} +} + +func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) (*os.Process, error) { + cmdPath := "/bin/bash" + + kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"} if previous { - args = append(args, "--previous") + kubectlArgs = append(kubectlArgs, "--previous") } - args = append(args, pod.Name) + identifier := pod.Name + kubectlArgs = append(kubectlArgs, pod.Name) if pod.Labels["workloadType"] == WorkloadTypeAPI && pod.Labels["userFacing"] == "true" { - args = append(args, apiContainerName) + kubectlArgs = append(kubectlArgs, apiContainerName) + kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines)) + identifier += " " + apiContainerName } - outr, outw, err := os.Pipe() + labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY \" | tail -n +1; done", identifier) + kubectlCmd := strings.Join(kubectlArgs, " ") + bashArgs := []string{"/bin/bash", "-c", kubectlCmd + labelLog} + process, err := os.StartProcess(cmdPath, bashArgs, attrs) if err != nil { - errors.Panic(err, "logs", "kubectl", "os.pipe") + return nil, errors.Wrap(err, strings.Join(bashArgs, " ")) } - defer outr.Close() - defer outw.Close() - inr, inw, err := os.Pipe() + return process, nil +} + +func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, inr *os.File) { + timer := time.NewTimer(0) + defer timer.Stop() + + processMap := make(map[string]*os.Process) + defer deleteProcesses(processMap) + labels := initialPodList[0].GetLabels() + appName := labels["appName"] + workloadID := labels["workloadID"] + + outr, outw, err := os.Pipe() if err != nil { errors.Panic(err, "logs", "kubectl", "os.pipe") } - defer inr.Close() - defer inw.Close() + defer outw.Close() + defer outr.Close() - process, err := os.StartProcess(cmdPath, args, &os.ProcAttr{ - Files: []*os.File{inr, outw, outw}, - }) - if err != nil { - errors.Panic(err, strings.Join(args, " ")) + socketWriterError := make(chan error, 1) + defer close(socketWriterError) + + go pumpStdout(socket, socketWriterError, outr, verbose, true) + + for { + select { + case <-podCheckCancel: + return + case <-timer.C: + pods, err := config.Kubernetes.ListPodsByLabels(map[string]string{ + "appName": appName, + "workloadID": workloadID, + "userFacing": "true", + }) + + if err != nil { + socketWriterError <- errors.Wrap(err, "pod check") + return + } + + latestRunningPodsMap := make(map[string]kcore.Pod) + latestRunningPods := strset.New() + for _, pod := range pods { + if k8s.GetPodStatus(&pod) != k8s.PodStatusPending { + latestRunningPods.Add(pod.GetName()) + latestRunningPodsMap[pod.GetName()] = pod + } + } + + prevRunningPods := strset.New() + for podName := range processMap { + prevRunningPods.Add(podName) + } + + newPods := strset.Difference(latestRunningPods, prevRunningPods) + podsToDelete := strset.Difference(prevRunningPods, latestRunningPods) + podsToKeep := strset.Intersection(prevRunningPods, latestRunningPods) + + // Prioritize adding running pods + podsToAddRunning := []string{} + podsToAddNotRunning := []string{} + + for podName := range newPods { + pod := latestRunningPodsMap[podName] + if k8s.GetPodStatus(&pod) == k8s.PodStatusRunning { + podsToAddRunning = append(podsToAddRunning, podName) + } else { + podsToAddNotRunning = append(podsToAddNotRunning, podName) + } + } + podsToAdd := append(podsToAddRunning, podsToAddNotRunning...) + + maxPodsToAdd := maxParallelPodLogging - len(podsToKeep) + if len(podsToAdd) < maxPodsToAdd { + maxPodsToAdd = len(podsToAdd) + } + + for _, podName := range podsToAdd[:maxPodsToAdd] { + process, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{ + Files: []*os.File{inr, outw, outw}, + }) + if err != nil { + socketWriterError <- err + return + } + processMap[podName] = process + } + + deleteMap := make(map[string]*os.Process, len(podsToDelete)) + + for podName := range podsToDelete { + deleteMap[podName] = processMap[podName] + delete(processMap, podName) + } + deleteProcesses(deleteMap) + timer.Reset(podCheckInterval) + } } +} - go pumpStdout(socket, outr, verbose, true) - pumpStdin(socket, inw) - stopProcess(process) +func deleteProcesses(processMap map[string]*os.Process) { + for _, process := range processMap { + process.Signal(os.Interrupt) + } + time.Sleep(5 * time.Second) + for _, process := range processMap { + process.Signal(os.Kill) + } } func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) { @@ -196,7 +324,10 @@ func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) { } else { logsReader = strings.NewReader(logs) } - go pumpStdout(socket, logsReader, verbose, false) + + socketWriterError := make(chan error) + defer close(socketWriterError) + go pumpStdout(socket, socketWriterError, logsReader, verbose, false) inr, inw, err := os.Pipe() if err != nil { @@ -224,7 +355,7 @@ func pumpStdin(socket *websocket.Conn, writer io.Writer) { } } -func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkForLastLog bool) { +func pumpStdout(socket *websocket.Conn, socketWriterError chan error, reader io.Reader, verbose bool, checkForLastLog bool) { scanner := bufio.NewScanner(reader) for scanner.Scan() { socket.SetWriteDeadline(time.Now().Add(writeWait)) @@ -243,14 +374,23 @@ func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkFor } } + select { + case err := <-socketWriterError: + if err != nil { + writeSocket(err.Error(), socket) + } + default: + } + socket.SetWriteDeadline(time.Now().Add(writeWait)) socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) time.Sleep(closeGracePeriod) socket.Close() } -var cortexRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`) -var tensorflowRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`) +var cortexRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`) +var tensorflowRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`) +var jsonPrefixRegex = regexp.MustCompile(`^\ *?(\{|\[)`) func formatHeader1(headerString string) *string { headerBorder := "\n" + strings.Repeat("-", len(headerString)) + "\n" @@ -299,6 +439,20 @@ func extractFromCortexLog(match string, loglevel string, logStr string) (*string return formatHeader3(cutStr), false } + matches := jsonPrefixRegex.FindStringSubmatch(cutStr) + if len(matches) == 2 { + indentIndex := len(matches[0]) - 1 // matches to curly or square bracket so remove the last char + indentStr := cutStr[:indentIndex] + maybeJSON := cutStr[indentIndex:] + jsonBytes := []byte(maybeJSON) + var obytes bytes.Buffer + err := json.Indent(&obytes, jsonBytes, indentStr, " ") + if err == nil { + ostr := indentStr + string(obytes.String()) + return &ostr, false + } + } + lastLogRe := regexp.MustCompile(`^workload: (\w+), completed: (\S+)`) if lastLogRe.MatchString(cutStr) { return &cutStr, true diff --git a/pkg/workloads/cortex/lib/util.py b/pkg/workloads/cortex/lib/util.py index 09a16d0c0e..808fafdf87 100644 --- a/pkg/workloads/cortex/lib/util.py +++ b/pkg/workloads/cortex/lib/util.py @@ -118,7 +118,7 @@ def log_pretty(obj, indent=0, logging_func=logger.info): def log_pretty_flat(obj, indent=0, logging_func=logger.info): - logging_func(pp_str_flat(obj), indent) + logging_func(pp_str_flat(obj, indent)) def pluralize(num, singular, plural): diff --git a/pkg/workloads/cortex/onnx_serve/api.py b/pkg/workloads/cortex/onnx_serve/api.py index de2267a153..e02d99f673 100644 --- a/pkg/workloads/cortex/onnx_serve/api.py +++ b/pkg/workloads/cortex/onnx_serve/api.py @@ -135,7 +135,7 @@ def predict(app_name, api_name): response = {} if not util.is_dict(payload) or "samples" not in payload: - util.log_pretty(payload, logging_func=logger.error) + util.log_pretty_flat(payload, logging_func=logger.error) return prediction_failed(payload, "top level `samples` key not found in request") logger.info("Predicting " + util.pluralize(len(payload["samples"]), "sample", "samples")) @@ -143,7 +143,7 @@ def predict(app_name, api_name): predictions = [] samples = payload["samples"] if not util.is_list(samples): - util.log_pretty(samples, logging_func=logger.error) + util.log_pretty_flat(samples, logging_func=logger.error) return prediction_failed( payload, "expected the value of key `samples` to be a list of json objects" ) @@ -152,7 +152,7 @@ def predict(app_name, api_name): util.log_indent("sample {}".format(i + 1), 2) try: util.log_indent("Raw sample:", indent=4) - util.log_pretty(sample, indent=6) + util.log_pretty_flat(sample, indent=6) if request_handler is not None and util.has_function(request_handler, "pre_inference"): sample = request_handler.pre_inference(sample, input_metadata) @@ -169,7 +169,7 @@ def predict(app_name, api_name): if request_handler is not None and util.has_function(request_handler, "post_inference"): result = request_handler.post_inference(result, output_metadata) util.log_indent("Prediction:", indent=4) - util.log_pretty(result, indent=6) + util.log_pretty_flat(result, indent=6) prediction = {"prediction": result} except CortexException as e: e.wrap("error", "sample {}".format(i + 1)) diff --git a/pkg/workloads/cortex/tf_api/api.py b/pkg/workloads/cortex/tf_api/api.py index f6a56eeea2..4f40c5062f 100644 --- a/pkg/workloads/cortex/tf_api/api.py +++ b/pkg/workloads/cortex/tf_api/api.py @@ -259,11 +259,11 @@ def run_predict(sample): result = parse_response_proto(response_proto) util.log_indent("Raw sample:", indent=4) - util.log_pretty(sample, indent=6) + util.log_pretty_flat(sample, indent=6) util.log_indent("Transformed sample:", indent=4) - util.log_pretty(transformed_sample, indent=6) + util.log_pretty_flat(transformed_sample, indent=6) util.log_indent("Prediction:", indent=4) - util.log_pretty(result, indent=6) + util.log_pretty_flat(result, indent=6) result["transformed_sample"] = transformed_sample @@ -272,9 +272,9 @@ def run_predict(sample): response_proto = local_cache["stub"].Predict(prediction_request, timeout=10.0) result = parse_response_proto_raw(response_proto) util.log_indent("Sample:", indent=4) - util.log_pretty(sample, indent=6) + util.log_pretty_flat(sample, indent=6) util.log_indent("Prediction:", indent=4) - util.log_pretty(result, indent=6) + util.log_pretty_flat(result, indent=6) if request_handler is not None and util.has_function(request_handler, "post_inference"): result = request_handler.post_inference(result, local_cache["metadata"]["signatureDef"]) @@ -315,6 +315,7 @@ def health(): @app.route("//", methods=["POST"]) def predict(deployment_name, api_name): + try: payload = request.get_json() except Exception as e: @@ -326,7 +327,7 @@ def predict(deployment_name, api_name): response = {} if not util.is_dict(payload) or "samples" not in payload: - util.log_pretty(payload, logging_func=logger.error) + util.log_pretty_flat(payload, logging_func=logger.error) return prediction_failed(payload, "top level `samples` key not found in request") logger.info("Predicting " + util.pluralize(len(payload["samples"]), "sample", "samples")) @@ -334,7 +335,7 @@ def predict(deployment_name, api_name): predictions = [] samples = payload["samples"] if not util.is_list(samples): - util.log_pretty(samples, logging_func=logger.error) + util.log_pretty_flat(samples, logging_func=logger.error) return prediction_failed( payload, "expected the value of key `samples` to be a list of json objects" )