Skip to content

Commit 4b5f3e0

Browse files
authored
Stream from cloudwatch logs directly (#447)
1 parent c828fc5 commit 4b5f3e0

File tree

5 files changed

+135
-509
lines changed

5 files changed

+135
-509
lines changed

manager/manifests/fluentd.yaml

Lines changed: 5 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ data:
6969
pos_file /var/log/fluentd-containers.log.pos
7070
tag *
7171
read_from_head true
72+
refresh_interval 1
7273
<parse>
7374
@type json
7475
time_format %Y-%m-%dT%H:%M:%S.%NZ
@@ -81,79 +82,24 @@ data:
8182
@id filter_kube_metadata
8283
</filter>
8384
84-
<match **>
85-
@type route
86-
<route **>
87-
copy
88-
@label @by_pod
89-
</route>
90-
<route **>
91-
copy
92-
@label @by_endpoint
93-
</route>
94-
</match>
95-
</label>
96-
97-
<label @by_pod>
98-
<filter **>
99-
@type record_transformer
100-
@id filter_containers_stream_transformer
101-
<record>
102-
stream_name ${tag_parts[3]}
103-
</record>
104-
remove_keys kubernetes,docker,stream
105-
</filter>
106-
<match **>
107-
@type cloudwatch_logs
108-
region "#{ENV['AWS_REGION']}"
109-
log_group_name "#{ENV['LOG_GROUP_NAME']}"
110-
log_stream_name_key stream_name
111-
remove_log_stream_name_key true
112-
auto_create_stream true
113-
<buffer>
114-
flush_interval 5
115-
chunk_limit_size 2m
116-
queued_chunks_limit_size 32
117-
retry_forever true
118-
</buffer>
119-
</match>
120-
</label>
121-
122-
<label @by_endpoint>
12385
<filter **>
12486
@type record_transformer
12587
enable_ruby
12688
<record>
127-
workload_type ${record.dig("kubernetes", "labels", "workloadType") || "unknown"}
89+
stream_name ${record.dig("kubernetes", "pod_name")}_${record.dig("kubernetes", "container_name")}
90+
log ${record.dig("log").rstrip}
12891
</record>
129-
remove_keys docker,stream
130-
</filter>
131-
132-
<filter **>
133-
@type grep
134-
regexp1 workload_type api
135-
</filter>
136-
<filter **>
137-
@type record_transformer
138-
enable_ruby
139-
<record>
140-
pod_name ${record.dig("kubernetes", "pod_name")}
141-
container_name ${record.dig("kubernetes", "container_name")}
142-
workload_id ${record.dig("kubernetes", "labels", "workloadID")}
143-
stream_name ${record.dig("kubernetes", "labels", "appName")}.${record.dig("kubernetes", "labels", "apiName")}
144-
</record>
145-
remove_keys kubernetes,api_name,app_name,workload_type
92+
remove_keys kubernetes,docker,stream
14693
</filter>
14794
<match **>
14895
@type cloudwatch_logs
149-
@id out_cloudwatch_logs_endpoints
15096
region "#{ENV['AWS_REGION']}"
15197
log_group_name "#{ENV['LOG_GROUP_NAME']}"
15298
log_stream_name_key stream_name
15399
remove_log_stream_name_key true
154100
auto_create_stream true
155101
<buffer>
156-
flush_interval 5
102+
flush_interval 2
157103
chunk_limit_size 2m
158104
queued_chunks_limit_size 32
159105
retry_forever true

pkg/lib/aws/aws.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type Client struct {
3333
Bucket string
3434
S3 *s3.S3
3535
stsClient *sts.STS
36-
cloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs
36+
CloudWatchLogsClient *cloudwatchlogs.CloudWatchLogs
3737
CloudWatchMetrics *cloudwatch.CloudWatch
3838
awsAccountID string
3939
HashedAccountID string
@@ -51,7 +51,7 @@ func New(region string, bucket string, withAccountID bool) (*Client, error) {
5151
S3: s3.New(sess),
5252
stsClient: sts.New(sess),
5353
CloudWatchMetrics: cloudwatch.New(sess),
54-
cloudWatchLogsClient: cloudwatchlogs.New(sess),
54+
CloudWatchLogsClient: cloudwatchlogs.New(sess),
5555
}
5656

5757
if withAccountID {

pkg/lib/aws/errors.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ func (t ErrorKind) MarshalBinary() ([]byte, error) {
7979
}
8080

8181
func IsNoSuchKeyErr(err error) bool {
82-
return checkErrCode(err, "NoSuchKey")
82+
return CheckErrCode(err, "NoSuchKey")
8383
}
8484

8585
func IsNotFoundErr(err error) bool {
86-
return checkErrCode(err, "NotFound")
86+
return CheckErrCode(err, "NotFound")
8787
}
8888

89-
func checkErrCode(err error, errorCode string) bool {
89+
func CheckErrCode(err error, errorCode string) bool {
9090
awsErr, ok := errors.Cause(err).(awserr.Error)
9191
if !ok {
9292
return false

pkg/lib/aws/logs.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

0 commit comments

Comments
 (0)