From 9d87bc972017fedc45b8f7904cc93e41cd794c5c Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 11:33:16 -0400 Subject: [PATCH 1/8] Create loggroup per api stream from log group --- go.mod | 1 + go.sum | 2 + images/fluentd/Dockerfile | 3 +- manager/manifests/fluentd.yaml | 12 +- pkg/operator/api/context/context.go | 5 + pkg/operator/workloads/api_workload.go | 2 + pkg/operator/workloads/logs.go | 188 ++++++++++++++++++------- 7 files changed, 156 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index ac44a898bd..bd7303a019 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8 github.com/ugorji/go/codec v1.1.7 github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca + gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c k8s.io/api v0.0.0-20190620084959-7cf5895f2711 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab diff --git a/go.sum b/go.sum index cfd19ec971..98a516cc22 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c h1:4GYkPhjcYLPrPAnoxHVQlH/xcXtWN8pEgqBnHrPAs8c= +gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c/go.mod h1:xd7qpr5uPMNy4hsRJ5JEBXA8tJjTFmUI1soCjlCIgAE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/images/fluentd/Dockerfile b/images/fluentd/Dockerfile index ca0c658eae..2d1655dd48 100644 --- a/images/fluentd/Dockerfile +++ b/images/fluentd/Dockerfile @@ -1,3 +1,2 @@ FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0 -RUN fluent-gem install fluent-plugin-grep -RUN fluent-gem install fluent-plugin-route +RUN fluent-gem install fluent-plugin-record-modifier --no-document diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index f9b16fcac6..68d3fa5c3a 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -83,20 +83,22 @@ data: - @type record_transformer - enable_ruby + @type record_modifier - stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} + default_group_name "#{ENV['LOG_GROUP_NAME']}" + group_name ${record.dig("kubernetes", "labels", "logGroupName") || record["default_group_name"]} + stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} log ${record.dig("log").rstrip} - remove_keys kubernetes,docker,stream + remove_keys kubernetes,docker,stream,default_group_name @type cloudwatch_logs region "#{ENV['AWS_REGION']}" - log_group_name "#{ENV['LOG_GROUP_NAME']}" + log_group_name_key group_name log_stream_name_key stream_name remove_log_stream_name_key true + remove_log_group_name_key true auto_create_stream true flush_interval 2 diff --git a/pkg/operator/api/context/context.go b/pkg/operator/api/context/context.go index f073679f49..c2f48d9cf4 100644 --- a/pkg/operator/api/context/context.go +++ b/pkg/operator/api/context/context.go @@ -209,6 +209,11 @@ func (ctx *Context) VisibleResourceByNameAndType(name string, resourceTypeStr st return nil, resource.ErrorInvalidType(resourceTypeStr) } +func (ctx *Context) LogGroupName(apiName string) string { + name := ctx.CortexConfig.LogGroup + "." + ctx.App.Name + "." + apiName + return name +} + func (ctx *Context) Validate() error { return nil } diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index 91dfac4973..e229959908 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -307,6 +307,7 @@ func tfAPISpec( "resourceID": ctx.APIs[api.Name].ID, "workloadID": workloadID, "userFacing": "true", + "logGroupName": ctx.LogGroupName(api.Name), }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", @@ -482,6 +483,7 @@ func onnxAPISpec( "resourceID": ctx.APIs[api.Name].ID, "workloadID": workloadID, "userFacing": "true", + "logGroupName": ctx.LogGroupName(api.Name), }, Annotations: map[string]string{ "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 15ccc31ed1..06444adc54 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -18,13 +18,16 @@ package workloads import ( "encoding/json" + "fmt" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/gorilla/websocket" + "gopkg.in/karalabe/cookiejar.v1/collections/deque" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/api/context" @@ -38,9 +41,41 @@ const ( socketMaxMessageSize = 8192 maxLogLinesPerRequest = 500 - pollPeriod = 250 // milliseconds + pollPeriod = 250 * time.Millisecond + streamRefreshPeriod = 2 * time.Second ) +type eventCache struct { + size int + seen strset.Set + eventQueue *deque.Deque +} + +func NewEventCache(cacheSize int) eventCache { + return eventCache{ + size: cacheSize, + seen: strset.New(), + eventQueue: deque.New(), + } +} + +func (c *eventCache) Has(eventID string) bool { + return c.seen.Has(eventID) +} + +func (c *eventCache) PopLeft() { + eventID := c.eventQueue.PopLeft().(string) + c.seen.Remove(eventID) +} + +func (c *eventCache) Add(eventID string) { + if c.eventQueue.Size() == c.size { + c.PopLeft() + } + c.seen.Add(eventID) + c.eventQueue.PushRight(eventID) +} + func ReadLogs(appName string, podLabels map[string]string, socket *websocket.Conn) { podCheckCancel := make(chan struct{}) defer close(podCheckCancel) @@ -67,14 +102,29 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel timer := time.NewTimer(0) defer timer.Stop() - lastTimestamp := int64(0) - previousEvents := strset.New() + lastLogTime := time.Now() + lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod) + logStreamNames := []string{} + logStreamSet := strset.New() var currentContextID string var prefix string - var ctx *context.Context var err error + var ctx = CurrentContext(appName) + eventCache := NewEventCache(10000) + + if ctx == nil { + writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") + return + } + + logGroupName, err := getLogGroupName(ctx, podLabels) + if err != nil { + writeAndCloseSocket(socket, err.Error()) // unexpected + return + } + for { select { case <-podCheckCancel: @@ -83,8 +133,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel ctx = CurrentContext(appName) if ctx == nil { - writeString(socket, "\ndeployment "+appName+" not found") - closeSocket(socket) + writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") continue } @@ -93,105 +142,139 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel if podLabels["workloadType"] == resource.APIType.String() { apiName := podLabels["apiName"] if _, ok := ctx.APIs[apiName]; !ok { - writeString(socket, "\napi "+apiName+" was not found in latest deployment") - closeSocket(socket) + writeAndCloseSocket(socket, "\napi "+apiName+" was not found in latest deployment") continue } writeString(socket, "\na new deployment was detected, streaming logs from the latest deployment") } else { - writeString(socket, "\nlogging non-api workloads is not supported") // unexpected - closeSocket(socket) + writeAndCloseSocket(socket, "\nlogging non-api workloads is not supported") // unexpected continue } } else { - lastTimestamp = ctx.CreatedEpoch * 1000 - } - - if podLabels["workloadType"] == resource.APIType.String() { - podLabels["workloadID"] = ctx.APIs[podLabels["apiName"]].WorkloadID + lastLogTime, _ = getPodStartTime(podLabels) } currentContextID = ctx.ID - writeString(socket, "\nretrieving logs...") - prefix = "" } - if len(prefix) == 0 { - prefix, err = getPrefix(podLabels) + if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) { + newLogStreamSet, err := getLogStreams(logGroupName) if err != nil { - writeString(socket, err.Error()) - closeSocket(socket) + writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error()) continue } + + if !logStreamSet.IsEqual(newLogStreamSet) { + lastLogTime = lastLogTime.Add(-streamRefreshPeriod) + logStreamNames = newLogStreamSet.Slice() + } + + lastLogStreamUpdateTime = time.Now() } - if len(prefix) == 0 { + if len(logStreamNames) == 0 { timer.Reset(pollPeriod) continue } endTime := time.Now().Unix() * 1000 - startTime := lastTimestamp - pollPeriod + logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String(config.Cortex.LogGroup), - LogStreamNamePrefix: aws.String(prefix), - StartTime: aws.Int64(startTime), - EndTime: aws.Int64(endTime), // requires milliseconds - Limit: aws.Int64(int64(maxLogLinesPerRequest)), + LogGroupName: aws.String(logGroupName), + LogStreamNames: aws.StringSlice(logStreamNames), + StartTime: aws.Int64(TimeToMillis(lastLogTime.Add(-pollPeriod))), + EndTime: aws.Int64(endTime), // requires milliseconds + Limit: aws.Int64(int64(maxLogLinesPerRequest)), }) if err != nil { - if !awslib.CheckErrCode(err, "ResourceNotFoundException") { - writeString(socket, "error encountered while fetching logs from cloudwatch: "+err.Error()) - closeSocket(socket) + if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) { + writeAndCloseSocket(socket, "error encountered while fetching logs from cloudwatch: "+err.Error()) continue } } - newEvents := strset.New() + lastLogTimestampMillis := TimeToMillis(lastLogTime) for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) - if !previousEvents.Has(*logEvent.EventId) { - socket.WriteMessage(websocket.TextMessage, []byte(log.Log)) - if *logEvent.Timestamp > lastTimestamp { - lastTimestamp = *logEvent.Timestamp + if !eventCache.Has(*logEvent.EventId) { + socket.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("%s %s", *logEvent.LogStreamName, log.Log))) + if *logEvent.Timestamp > lastLogTimestampMillis { + lastLogTimestampMillis = *logEvent.Timestamp } + eventCache.Add(*logEvent.EventId) } - newEvents.Add(*logEvent.EventId) } + lastLogTime = MillisToTime(endTime) if len(logEventsOutput.Events) == maxLogLinesPerRequest { - socket.WriteMessage(websocket.TextMessage, []byte("---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----")) - lastTimestamp = endTime + writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----") + lastLogTime = MillisToTime(endTime) } - previousEvents = newEvents - timer.Reset(pollPeriod * time.Millisecond) + timer.Reset(pollPeriod) } } } -func getPrefix(searchLabels map[string]string) (string, error) { +func MillisToTime(epochMillis int64) time.Time { + return time.Unix(epochMillis/1000, (epochMillis%1000)*1000) +} + +func TimeToMillis(t time.Time) int64 { + return t.UnixNano() / 1000 +} + +func getLogStreams(logGroupName string) (strset.Set, error) { + describeLogStreamsOnput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + Descending: aws.Bool(true), + LogGroupName: aws.String(logGroupName), + OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime), + Limit: aws.Int64(50), + }) + if err != nil { + if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) { + return nil, err + } + return nil, nil + } + + streams := strset.New() + + for _, stream := range describeLogStreamsOnput.LogStreams { + streams.Add(*stream.LogStreamName) + } + return streams, nil +} + +func getPodStartTime(searchLabels map[string]string) (time.Time, error) { pods, err := config.Kubernetes.ListPodsByLabels(searchLabels) if err != nil { - return "", err + return time.Now(), err } if len(pods) == 0 { - return "", nil + return time.Now(), nil } - podLabels := pods[0].GetLabels() - if apiName, ok := podLabels["apiName"]; ok { - if podTemplateHash, ok := podLabels["pod-template-hash"]; ok { - return internalAPIName(apiName, podLabels["appName"]) + "-" + podTemplateHash, nil + startTime := pods[0].CreationTimestamp.Time + for _, pod := range pods[1:] { + if pod.CreationTimestamp.Time.Before(startTime) { + startTime = pod.CreationTimestamp.Time } - return "", nil // unexpected, pod template hash not set yet } - return pods[0].Name, nil // unexpected, logging non api resources + + return startTime, nil +} + +func getLogGroupName(ctx *context.Context, searchLabels map[string]string) (string, error) { + if searchLabels["workloadType"] == resource.APIType.String() { + return ctx.LogGroupName(searchLabels["apiName"]), nil + } + return "nil", errors.New("unsupported workload type") // unexpected } func writeString(socket *websocket.Conn, message string) { @@ -203,3 +286,8 @@ func closeSocket(socket *websocket.Conn) { socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) time.Sleep(socketCloseGracePeriod) } + +func writeAndCloseSocket(socket *websocket.Conn, message string) { + writeString(socket, message) + closeSocket(socket) +} From e78325a5ad2977dfc0225aea1b13a093d7e27827 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 11:34:54 -0400 Subject: [PATCH 2/8] Remove unnecessary fmt import --- pkg/operator/workloads/logs.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 06444adc54..7b410552a9 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -18,7 +18,6 @@ package workloads import ( "encoding/json" - "fmt" "time" "github.com/aws/aws-sdk-go/aws" @@ -201,7 +200,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel json.Unmarshal([]byte(*logEvent.Message), &log) if !eventCache.Has(*logEvent.EventId) { - socket.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("%s %s", *logEvent.LogStreamName, log.Log))) + socket.WriteMessage(websocket.TextMessage, []byte(log.Log)) if *logEvent.Timestamp > lastLogTimestampMillis { lastLogTimestampMillis = *logEvent.Timestamp } From 6dee2d31040242a5f24a327a2cf7b75b945056b5 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 11:39:14 -0400 Subject: [PATCH 3/8] Fix linting --- manager/manifests/fluentd.yaml | 2 +- pkg/operator/workloads/logs.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index 68d3fa5c3a..861db4ca9f 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -87,7 +87,7 @@ data: default_group_name "#{ENV['LOG_GROUP_NAME']}" group_name ${record.dig("kubernetes", "labels", "logGroupName") || record["default_group_name"]} - stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} + stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} log ${record.dig("log").rstrip} remove_keys kubernetes,docker,stream,default_group_name diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 7b410552a9..a937f3addc 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -50,7 +50,7 @@ type eventCache struct { eventQueue *deque.Deque } -func NewEventCache(cacheSize int) eventCache { +func newEventCache(cacheSize int) eventCache { return eventCache{ size: cacheSize, seen: strset.New(), @@ -111,7 +111,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel var err error var ctx = CurrentContext(appName) - eventCache := NewEventCache(10000) + eventCache := newEventCache(10000) if ctx == nil { writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") From a61f8f983adbc8caf28d558d0043fe1fa978c695 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 12:00:56 -0400 Subject: [PATCH 4/8] Use time instead of millis where possible --- pkg/operator/workloads/logs.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index a937f3addc..5e7592972b 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -177,7 +177,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel continue } - endTime := time.Now().Unix() * 1000 + endTime := TimeToMillis(time.Now()) logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroupName), @@ -198,7 +198,6 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) - if !eventCache.Has(*logEvent.EventId) { socket.WriteMessage(websocket.TextMessage, []byte(log.Log)) if *logEvent.Timestamp > lastLogTimestampMillis { @@ -208,7 +207,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTime = MillisToTime(endTime) + lastLogTime = MillisToTime(lastLogTimestampMillis) if len(logEventsOutput.Events) == maxLogLinesPerRequest { writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----") lastLogTime = MillisToTime(endTime) @@ -220,11 +219,13 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } func MillisToTime(epochMillis int64) time.Time { - return time.Unix(epochMillis/1000, (epochMillis%1000)*1000) + seconds := epochMillis / 1000 + millis := epochMillis % 1000 + return time.Unix(seconds, millis*int64(time.Millisecond)) } func TimeToMillis(t time.Time) int64 { - return t.UnixNano() / 1000 + return t.UnixNano() / int64(time.Millisecond) } func getLogStreams(logGroupName string) (strset.Set, error) { From 123b63b738f448bc7c8940c9bdd7f6f0ca688fac Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 15:26:05 -0400 Subject: [PATCH 5/8] Respond to PR comments --- dev/versions.md | 1 + docs/cluster/uninstall.md | 2 +- go.mod | 2 +- go.sum | 4 +-- images/fluentd/Dockerfile | 2 +- manager/manifests/fluentd.yaml | 3 +- pkg/lib/time/time.go | 10 ++++++ pkg/operator/workloads/logs.go | 64 ++++++++++++++-------------------- 8 files changed, 43 insertions(+), 45 deletions(-) diff --git a/dev/versions.md b/dev/versions.md index 9c3b37173e..0b3a3b987c 100644 --- a/dev/versions.md +++ b/dev/versions.md @@ -151,6 +151,7 @@ Note: overriding horizontal-pod-autoscaler-sync-period on EKS is currently not s 1. Find the latest release on [Dockerhub](https://hub.docker.com/r/fluent/fluentd-kubernetes-daemonset/) 1. Update the base image version in `images/fluentd/Dockerfile` +1. Update fluent-gem versions to latest 1. Update `fluentd.yaml` as necessary (make sure to maintain all Cortex environment variables) ## Statsd diff --git a/docs/cluster/uninstall.md b/docs/cluster/uninstall.md index fd860dc7d7..2aab1f040e 100644 --- a/docs/cluster/uninstall.md +++ b/docs/cluster/uninstall.md @@ -43,5 +43,5 @@ aws s3 ls aws s3 rb --force s3:// # Delete the log group -aws logs delete-log-group --log-group-name cortex --region us-west-2 +aws logs describe-log-groups --log-group-name-prefix=cortex --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} ``` diff --git a/go.mod b/go.mod index bd7303a019..6dfc886f0d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8 github.com/ugorji/go/codec v1.1.7 github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca - gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c + gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 k8s.io/api v0.0.0-20190620084959-7cf5895f2711 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab diff --git a/go.sum b/go.sum index 98a516cc22..58996425df 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c h1:4GYkPhjcYLPrPAnoxHVQlH/xcXtWN8pEgqBnHrPAs8c= -gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c/go.mod h1:xd7qpr5uPMNy4hsRJ5JEBXA8tJjTFmUI1soCjlCIgAE= +gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw= +gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/images/fluentd/Dockerfile b/images/fluentd/Dockerfile index 2d1655dd48..2a75e5b44d 100644 --- a/images/fluentd/Dockerfile +++ b/images/fluentd/Dockerfile @@ -1,2 +1,2 @@ FROM fluent/fluentd-kubernetes-daemonset:v1.7.1-debian-cloudwatch-1.0 -RUN fluent-gem install fluent-plugin-record-modifier --no-document +RUN fluent-gem install fluent-plugin-record-modifier -v 2.0.1 --no-document diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index 861db4ca9f..5e27d90032 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -85,8 +85,7 @@ data: @type record_modifier - default_group_name "#{ENV['LOG_GROUP_NAME']}" - group_name ${record.dig("kubernetes", "labels", "logGroupName") || record["default_group_name"]} + group_name ${record.dig("kubernetes", "labels", "logGroupName") || ENV['LOG_GROUP_NAME']} stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} log ${record.dig("log").rstrip} diff --git a/pkg/lib/time/time.go b/pkg/lib/time/time.go index ac2735abff..c7c67a3c42 100644 --- a/pkg/lib/time/time.go +++ b/pkg/lib/time/time.go @@ -109,3 +109,13 @@ func LocalHourNow() string { func OlderThanSeconds(t time.Time, secs float64) bool { return time.Since(t).Seconds() > secs } + +func MillisToTime(epochMillis int64) time.Time { + seconds := epochMillis / 1000 + millis := epochMillis % 1000 + return time.Unix(seconds, millis*int64(time.Millisecond)) +} + +func TimeToMillis(t time.Time) int64 { + return t.UnixNano() / int64(time.Millisecond) +} diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 5e7592972b..1ed35e190d 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -23,12 +23,13 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/gorilla/websocket" - "gopkg.in/karalabe/cookiejar.v1/collections/deque" + "gopkg.in/karalabe/cookiejar.v2/collections/deque" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" + timelib "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" "github.com/cortexlabs/cortex/pkg/operator/config" @@ -39,7 +40,9 @@ const ( socketCloseGracePeriod = 10 * time.Second socketMaxMessageSize = 8192 + maxCacheSize = 10000 maxLogLinesPerRequest = 500 + maxStreamsPerRequest = 50 pollPeriod = 250 * time.Millisecond streamRefreshPeriod = 2 * time.Second ) @@ -62,14 +65,10 @@ func (c *eventCache) Has(eventID string) bool { return c.seen.Has(eventID) } -func (c *eventCache) PopLeft() { - eventID := c.eventQueue.PopLeft().(string) - c.seen.Remove(eventID) -} - func (c *eventCache) Add(eventID string) { if c.eventQueue.Size() == c.size { - c.PopLeft() + eventID := c.eventQueue.PopLeft().(string) + c.seen.Remove(eventID) } c.seen.Add(eventID) c.eventQueue.PushRight(eventID) @@ -104,14 +103,14 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel lastLogTime := time.Now() lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod) - logStreamNames := []string{} - logStreamSet := strset.New() + logStreamNamesSet := strset.New() + var currentContextID string var prefix string var err error var ctx = CurrentContext(appName) - eventCache := newEventCache(10000) + eventCache := newEventCache(maxCacheSize) if ctx == nil { writeAndCloseSocket(socket, "\ndeployment "+appName+" not found") @@ -158,32 +157,31 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) { - newLogStreamSet, err := getLogStreams(logGroupName) + newLogStreamNamesSet, err := getLogStreams(logGroupName) if err != nil { writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error()) continue } - if !logStreamSet.IsEqual(newLogStreamSet) { + if !logStreamNamesSet.IsEqual(newLogStreamNamesSet) { lastLogTime = lastLogTime.Add(-streamRefreshPeriod) - logStreamNames = newLogStreamSet.Slice() + logStreamNamesSet = newLogStreamNamesSet } - lastLogStreamUpdateTime = time.Now() } - if len(logStreamNames) == 0 { + if len(logStreamNamesSet) == 0 { timer.Reset(pollPeriod) continue } - endTime := TimeToMillis(time.Now()) + endTime := timelib.TimeToMillis(time.Now()) logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroupName), - LogStreamNames: aws.StringSlice(logStreamNames), - StartTime: aws.Int64(TimeToMillis(lastLogTime.Add(-pollPeriod))), - EndTime: aws.Int64(endTime), // requires milliseconds + LogStreamNames: aws.StringSlice(logStreamNamesSet.Slice()), + StartTime: aws.Int64(timelib.TimeToMillis(lastLogTime.Add(-pollPeriod))), + EndTime: aws.Int64(endTime), Limit: aws.Int64(int64(maxLogLinesPerRequest)), }) @@ -194,7 +192,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTimestampMillis := TimeToMillis(lastLogTime) + lastLogTimestampMillis := timelib.TimeToMillis(lastLogTime) for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) @@ -207,10 +205,10 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTime = MillisToTime(lastLogTimestampMillis) + lastLogTime = timelib.MillisToTime(lastLogTimestampMillis) if len(logEventsOutput.Events) == maxLogLinesPerRequest { writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----") - lastLogTime = MillisToTime(endTime) + lastLogTime = timelib.MillisToTime(endTime) } timer.Reset(pollPeriod) @@ -218,22 +216,12 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } -func MillisToTime(epochMillis int64) time.Time { - seconds := epochMillis / 1000 - millis := epochMillis % 1000 - return time.Unix(seconds, millis*int64(time.Millisecond)) -} - -func TimeToMillis(t time.Time) int64 { - return t.UnixNano() / int64(time.Millisecond) -} - func getLogStreams(logGroupName string) (strset.Set, error) { - describeLogStreamsOnput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + describeLogStreamsOutput, err := config.AWS.CloudWatchLogsClient.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime), Descending: aws.Bool(true), LogGroupName: aws.String(logGroupName), - OrderBy: aws.String(cloudwatchlogs.OrderByLastEventTime), - Limit: aws.Int64(50), + Limit: aws.Int64(maxStreamsPerRequest), }) if err != nil { if !awslib.CheckErrCode(err, cloudwatchlogs.ErrCodeResourceNotFoundException) { @@ -244,7 +232,7 @@ func getLogStreams(logGroupName string) (strset.Set, error) { streams := strset.New() - for _, stream := range describeLogStreamsOnput.LogStreams { + for _, stream := range describeLogStreamsOutput.LogStreams { streams.Add(*stream.LogStreamName) } return streams, nil @@ -253,11 +241,11 @@ func getLogStreams(logGroupName string) (strset.Set, error) { func getPodStartTime(searchLabels map[string]string) (time.Time, error) { pods, err := config.Kubernetes.ListPodsByLabels(searchLabels) if err != nil { - return time.Now(), err + return time.Time{}, err } if len(pods) == 0 { - return time.Now(), nil + return time.Time{}, nil } startTime := pods[0].CreationTimestamp.Time From 3c57afe8976f139fc2b09bd656144c3897d7944a Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 15:59:20 -0400 Subject: [PATCH 6/8] Rename time function --- pkg/lib/time/time.go | 2 +- pkg/operator/workloads/logs.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/lib/time/time.go b/pkg/lib/time/time.go index c7c67a3c42..6de347f0d8 100644 --- a/pkg/lib/time/time.go +++ b/pkg/lib/time/time.go @@ -116,6 +116,6 @@ func MillisToTime(epochMillis int64) time.Time { return time.Unix(seconds, millis*int64(time.Millisecond)) } -func TimeToMillis(t time.Time) int64 { +func ToMillis(t time.Time) int64 { return t.UnixNano() / int64(time.Millisecond) } diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index 1ed35e190d..a64b7a9f78 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -175,12 +175,12 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel continue } - endTime := timelib.TimeToMillis(time.Now()) + endTime := timelib.ToMillis(time.Now()) logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroupName), LogStreamNames: aws.StringSlice(logStreamNamesSet.Slice()), - StartTime: aws.Int64(timelib.TimeToMillis(lastLogTime.Add(-pollPeriod))), + StartTime: aws.Int64(timelib.ToMillis(lastLogTime.Add(-pollPeriod))), EndTime: aws.Int64(endTime), Limit: aws.Int64(int64(maxLogLinesPerRequest)), }) @@ -192,7 +192,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTimestampMillis := timelib.TimeToMillis(lastLogTime) + lastLogTimestampMillis := timelib.ToMillis(lastLogTime) for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) From 26e75dc917227857f8368b0d603197c1c046b800 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 16:28:22 -0400 Subject: [PATCH 7/8] Address PR comments --- dev/versions.md | 2 +- manager/manifests/fluentd.yaml | 2 +- pkg/operator/workloads/logs.go | 26 +++++++++++++------------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dev/versions.md b/dev/versions.md index 0b3a3b987c..c04bb4af42 100644 --- a/dev/versions.md +++ b/dev/versions.md @@ -151,7 +151,7 @@ Note: overriding horizontal-pod-autoscaler-sync-period on EKS is currently not s 1. Find the latest release on [Dockerhub](https://hub.docker.com/r/fluent/fluentd-kubernetes-daemonset/) 1. Update the base image version in `images/fluentd/Dockerfile` -1. Update fluent-gem versions to latest +1. Update record-modifier in `images/fluentd/Dockerfile` to the latest version [here](https://github.com/repeatedly/fluent-plugin-record-modifier/blob/master/VERSION) 1. Update `fluentd.yaml` as necessary (make sure to maintain all Cortex environment variables) ## Statsd diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index 5e27d90032..6b4844dea0 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -89,7 +89,7 @@ data: stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")} log ${record.dig("log").rstrip} - remove_keys kubernetes,docker,stream,default_group_name + remove_keys kubernetes,docker,stream @type cloudwatch_logs diff --git a/pkg/operator/workloads/logs.go b/pkg/operator/workloads/logs.go index a64b7a9f78..c9f96924a8 100644 --- a/pkg/operator/workloads/logs.go +++ b/pkg/operator/workloads/logs.go @@ -29,7 +29,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" - timelib "github.com/cortexlabs/cortex/pkg/lib/time" + libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" "github.com/cortexlabs/cortex/pkg/operator/config" @@ -103,7 +103,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel lastLogTime := time.Now() lastLogStreamUpdateTime := time.Now().Add(-1 * streamRefreshPeriod) - logStreamNamesSet := strset.New() + logStreamNames := strset.New() var currentContextID string var prefix string @@ -157,30 +157,30 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } if lastLogStreamUpdateTime.Add(streamRefreshPeriod).Before(time.Now()) { - newLogStreamNamesSet, err := getLogStreams(logGroupName) + newLogStreamNames, err := getLogStreams(logGroupName) if err != nil { writeAndCloseSocket(socket, "error encountered while searching for log streams: "+err.Error()) continue } - if !logStreamNamesSet.IsEqual(newLogStreamNamesSet) { + if !logStreamNames.IsEqual(newLogStreamNames) { lastLogTime = lastLogTime.Add(-streamRefreshPeriod) - logStreamNamesSet = newLogStreamNamesSet + logStreamNames = newLogStreamNames } lastLogStreamUpdateTime = time.Now() } - if len(logStreamNamesSet) == 0 { + if len(logStreamNames) == 0 { timer.Reset(pollPeriod) continue } - endTime := timelib.ToMillis(time.Now()) + endTime := libtime.ToMillis(time.Now()) logEventsOutput, err := config.AWS.CloudWatchLogsClient.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroupName), - LogStreamNames: aws.StringSlice(logStreamNamesSet.Slice()), - StartTime: aws.Int64(timelib.ToMillis(lastLogTime.Add(-pollPeriod))), + LogStreamNames: aws.StringSlice(logStreamNames.Slice()), + StartTime: aws.Int64(libtime.ToMillis(lastLogTime.Add(-pollPeriod))), EndTime: aws.Int64(endTime), Limit: aws.Int64(int64(maxLogLinesPerRequest)), }) @@ -192,7 +192,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTimestampMillis := timelib.ToMillis(lastLogTime) + lastLogTimestampMillis := libtime.ToMillis(lastLogTime) for _, logEvent := range logEventsOutput.Events { var log FluentdLog json.Unmarshal([]byte(*logEvent.Message), &log) @@ -205,10 +205,10 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, podLabel } } - lastLogTime = timelib.MillisToTime(lastLogTimestampMillis) + lastLogTime = libtime.MillisToTime(lastLogTimestampMillis) if len(logEventsOutput.Events) == maxLogLinesPerRequest { writeString(socket, "---- Showing at most "+s.Int(maxLogLinesPerRequest)+" lines. Visit AWS cloudwatch logs console and search for \""+prefix+"\" in log group \""+config.Cortex.LogGroup+"\" for complete logs ----") - lastLogTime = timelib.MillisToTime(endTime) + lastLogTime = libtime.MillisToTime(endTime) } timer.Reset(pollPeriod) @@ -245,7 +245,7 @@ func getPodStartTime(searchLabels map[string]string) (time.Time, error) { } if len(pods) == 0 { - return time.Time{}, nil + return time.Now(), nil } startTime := pods[0].CreationTimestamp.Time From 5e8ce04ba4b22c85e72bbd0eb992393b106ea3f5 Mon Sep 17 00:00:00 2001 From: vishal Date: Tue, 17 Sep 2019 16:51:45 -0400 Subject: [PATCH 8/8] Update uninstall.md --- docs/cluster/uninstall.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/cluster/uninstall.md b/docs/cluster/uninstall.md index 2aab1f040e..35ffad1c83 100644 --- a/docs/cluster/uninstall.md +++ b/docs/cluster/uninstall.md @@ -43,5 +43,5 @@ aws s3 ls aws s3 rb --force s3:// # Delete the log group -aws logs describe-log-groups --log-group-name-prefix=cortex --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} +aws logs describe-log-groups --log-group-name-prefix= --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} ```