Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -748,18 +748,10 @@ compactor_grpc_client:
[memberlist: <memberlist>]

kafka_config:
# The Kafka backend address.
# CLI flag: -kafka.address
[address: <string> | default = "localhost:9092"]

# The Kafka topic name.
# CLI flag: -kafka.topic
[topic: <string> | default = ""]

# The Kafka client ID.
# CLI flag: -kafka.client-id
[client_id: <string> | default = ""]

# The maximum time allowed to open a connection to a Kafka broker.
# CLI flag: -kafka.dial-timeout
[dial_timeout: <duration> | default = 2s]
Expand All @@ -769,6 +761,24 @@ kafka_config:
# CLI flag: -kafka.write-timeout
[write_timeout: <duration> | default = 10s]

reader_config:
# The Kafka backend address.
# CLI flag: -kafka.reader.address
[address: <string> | default = ""]

# The Kafka client ID.
# CLI flag: -kafka.reader.client-id
[client_id: <string> | default = ""]

writer_config:
# The Kafka backend address.
# CLI flag: -kafka.writer.address
[address: <string> | default = ""]

# The Kafka client ID.
# CLI flag: -kafka.writer.client-id
[client_id: <string> | default = ""]

# The SASL username for authentication to Kafka using the PLAIN mechanism.
# Both username and password must be set.
# CLI flag: -kafka.sasl-username
Expand Down
13 changes: 12 additions & 1 deletion pkg/kafka/client/reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ func NewReaderClient(component string, kafkaCfg kafka.Config, logger log.Logger,
const fetchMaxBytes = 100_000_000

opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...)
opts = append(opts,

address := kafkaCfg.Address
clientID := kafkaCfg.ClientID
if kafkaCfg.ReaderConfig.Address != "" {
address = kafkaCfg.ReaderConfig.Address
clientID = kafkaCfg.ReaderConfig.ClientID
}

opts = append(
opts,
kgo.ClientID(clientID),
kgo.SeedBrokers(address),
kgo.FetchMinBytes(1),
kgo.FetchMaxBytes(fetchMaxBytes),
kgo.FetchMaxWait(5*time.Second),
Expand Down
13 changes: 13 additions & 0 deletions pkg/kafka/client/reader_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ func TestNewReaderClient(t *testing.T) {
},
wantErr: false,
},
{
name: "valid config with reader config",
config: kafka.Config{
Topic: "abcd",
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("password"),
ReaderConfig: kafka.ClientConfig{
Address: addr,
ClientID: "reader",
},
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Expand Down
11 changes: 9 additions & 2 deletions pkg/kafka/client/writer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ func NewWriterClient(component string, kafkaCfg kafka.Config, maxInflightProduce
// Do not export the client ID, because we use it to specify options to the backend.
metrics := NewClientMetrics(component, reg, kafkaCfg.EnableKafkaHistograms)

address := kafkaCfg.Address
clientID := kafkaCfg.ClientID
if kafkaCfg.WriterConfig.Address != "" {
address = kafkaCfg.WriterConfig.Address
clientID = kafkaCfg.WriterConfig.ClientID
}

opts := append(
commonKafkaClientOptions(kafkaCfg, metrics, logger),
kgo.ClientID(clientID),
kgo.SeedBrokers(address),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.DefaultProduceTopic(kafkaCfg.Topic),

Expand Down Expand Up @@ -155,8 +164,6 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM

func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.ClientID(cfg.ClientID),
kgo.SeedBrokers(cfg.Address),
kgo.DialTimeout(cfg.DialTimeout),

// A cluster metadata update is a request sent to a broker and getting back the map of partitions and
Expand Down
12 changes: 12 additions & 0 deletions pkg/kafka/client/writer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ func TestNewWriterClient(t *testing.T) {
},
wantErr: false,
},
{
name: "valid config with writer config",
config: kafka.Config{
Topic: "abcd",
WriteTimeout: time.Second,
WriterConfig: kafka.ClientConfig{
Address: addr,
ClientID: "writer",
},
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Expand Down
53 changes: 48 additions & 5 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ const (

var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrAmbiguousKafkaAddress = errors.New("the Kafka address has been configured in both kafka.address and kafka.reader_config.address or kafka.writer_config.address")
ErrAmbiguousKafkaClientID = errors.New("the Kafka client ID has been configured in both kafka.client_id and kafka.reader_config.client_id or kafka.writer_config.client_id")
ErrMixingOldAndNewClientConfig = errors.New("mixing old and new client config is not allowed")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, MaxProducerRecordDataBytesLimit)
)

// Config holds the generic config for the Kafka backend.
type Config struct {
Address string `yaml:"address"`
Address string `yaml:"address" doc:"hidden|deprecated"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
ClientID string `yaml:"client_id" doc:"hidden|deprecated"`
DialTimeout time.Duration `yaml:"dial_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`

ReaderConfig ClientConfig `yaml:"reader_config"`
WriterConfig ClientConfig `yaml:"writer_config"`

SASLUsername string `yaml:"sasl_username"`
SASLPassword flagext.Secret `yaml:"sasl_password"`

Expand All @@ -60,14 +66,27 @@ type Config struct {
EnableKafkaHistograms bool `yaml:"enable_kafka_histograms"`
}

type ClientConfig struct {
Address string `yaml:"address"`
ClientID string `yaml:"client_id"`
}

func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+".address", "", "The Kafka backend address.")
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.")
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("kafka", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+".address", "localhost:9092", "The Kafka backend address.")
cfg.ReaderConfig.RegisterFlagsWithPrefix(prefix+".reader", f)
cfg.WriterConfig.RegisterFlagsWithPrefix(prefix+".writer", f)

f.StringVar(&cfg.Address, prefix+".address", "localhost:9092", "The Kafka backend address. This setting is deprecated and will be removed in the next minor release.")
f.StringVar(&cfg.Topic, prefix+".topic", "", "The Kafka topic name.")
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.")
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID. This setting is deprecated and will be removed in the next minor release.")
f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")

Expand All @@ -92,9 +111,33 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if cfg.Address == "" {
if cfg.Address == "" && cfg.ReaderConfig.Address == "" && cfg.WriterConfig.Address == "" {
return ErrMissingKafkaAddress
}
if cfg.Address != "" && cfg.ReaderConfig.Address != "" {
return ErrAmbiguousKafkaAddress
}
if cfg.Address != "" && cfg.WriterConfig.Address != "" {
return ErrAmbiguousKafkaAddress
}
if cfg.ClientID != "" && cfg.ReaderConfig.ClientID != "" {
return ErrAmbiguousKafkaClientID
}
if cfg.ClientID != "" && cfg.WriterConfig.ClientID != "" {
return ErrAmbiguousKafkaClientID
}
if cfg.Address != "" && cfg.ReaderConfig.ClientID != "" {
return ErrMixingOldAndNewClientConfig
}
if cfg.Address != "" && cfg.WriterConfig.ClientID != "" {
return ErrMixingOldAndNewClientConfig
}
if cfg.ClientID != "" && cfg.ReaderConfig.Address != "" {
return ErrMixingOldAndNewClientConfig
}
if cfg.ClientID != "" && cfg.WriterConfig.Address != "" {
return ErrMixingOldAndNewClientConfig
}
if cfg.Topic == "" {
return ErrMissingKafkaTopic
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,60 @@ func TestBothSASLParamsMustBeSet(t *testing.T) {
err = cfg.Validate()
require.NoError(t, err)
}

func TestAmbiguousKafkaAddress(t *testing.T) {
cfg := Config{
Address: "localhost:9092",
ReaderConfig: ClientConfig{Address: "localhost:9092"},
WriterConfig: ClientConfig{Address: "localhost:9092"},
}
err := cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrAmbiguousKafkaAddress)
}

func TestAmbiguousKafkaClientID(t *testing.T) {
// Disallow defining in both places
cfg := Config{
ClientID: "abcd",
ReaderConfig: ClientConfig{Address: "reader:9092", ClientID: "abcd"},
WriterConfig: ClientConfig{Address: "writer:9092", ClientID: "abcd"},
}
err := cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrAmbiguousKafkaClientID)
}

func TestMixingOldAndNewClientConfig(t *testing.T) {
cfg := Config{
Address: "localhost:9092",
ReaderConfig: ClientConfig{ClientID: "reader"},
}
err := cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig)

cfg = Config{
Address: "localhost:9092",
WriterConfig: ClientConfig{ClientID: "reader"},
}
err = cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig)

cfg = Config{
ClientID: "abcd",
ReaderConfig: ClientConfig{Address: "localhost:9092"},
}
err = cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig)

cfg = Config{
ClientID: "abcd",
WriterConfig: ClientConfig{Address: "localhost:9092"},
}
err = cfg.Validate()
require.Error(t, err)
require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig)
}
4 changes: 4 additions & 0 deletions tools/deprecated-config-checker/deprecated-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ storage_config:
chunk_store_config:
write_dedupe_cache_config: "Write dedupe cache is deprecated along with deprecated index types. Consider using TSDB index which does not require a write dedupe cache."

kafka_config:
address: "Use reader_config.address or writer_config.address instead."
client_id: "Use reader_config.client_id or writer_config.client_id instead."

## NOTE: This will also be used to validate per-tenant overrides.
limits_config:
unordered_writes: "Will be eventually removed."
Expand Down
Loading