diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 03a2b5e2b9e05..e8299783eaef0 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -748,18 +748,10 @@ compactor_grpc_client: [memberlist: ] kafka_config: - # The Kafka backend address. - # CLI flag: -kafka.address - [address: | default = "localhost:9092"] - # The Kafka topic name. # CLI flag: -kafka.topic [topic: | default = ""] - # The Kafka client ID. - # CLI flag: -kafka.client-id - [client_id: | default = ""] - # The maximum time allowed to open a connection to a Kafka broker. # CLI flag: -kafka.dial-timeout [dial_timeout: | default = 2s] @@ -769,6 +761,24 @@ kafka_config: # CLI flag: -kafka.write-timeout [write_timeout: | default = 10s] + reader_config: + # The Kafka backend address. + # CLI flag: -kafka.reader.address + [address: | default = ""] + + # The Kafka client ID. + # CLI flag: -kafka.reader.client-id + [client_id: | default = ""] + + writer_config: + # The Kafka backend address. + # CLI flag: -kafka.writer.address + [address: | default = ""] + + # The Kafka client ID. + # CLI flag: -kafka.writer.client-id + [client_id: | default = ""] + # The SASL username for authentication to Kafka using the PLAIN mechanism. # Both username and password must be set. # CLI flag: -kafka.sasl-username diff --git a/pkg/kafka/client/reader_client.go b/pkg/kafka/client/reader_client.go index c3caa60d49655..905b354788352 100644 --- a/pkg/kafka/client/reader_client.go +++ b/pkg/kafka/client/reader_client.go @@ -25,7 +25,18 @@ func NewReaderClient(component string, kafkaCfg kafka.Config, logger log.Logger, const fetchMaxBytes = 100_000_000 opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...) - opts = append(opts, + + address := kafkaCfg.Address + clientID := kafkaCfg.ClientID + if kafkaCfg.ReaderConfig.Address != "" { + address = kafkaCfg.ReaderConfig.Address + clientID = kafkaCfg.ReaderConfig.ClientID + } + + opts = append( + opts, + kgo.ClientID(clientID), + kgo.SeedBrokers(address), kgo.FetchMinBytes(1), kgo.FetchMaxBytes(fetchMaxBytes), kgo.FetchMaxWait(5*time.Second), diff --git a/pkg/kafka/client/reader_client_test.go b/pkg/kafka/client/reader_client_test.go index 7a3b1cfc0c9a8..51bedda731c6f 100644 --- a/pkg/kafka/client/reader_client_test.go +++ b/pkg/kafka/client/reader_client_test.go @@ -34,6 +34,19 @@ func TestNewReaderClient(t *testing.T) { }, wantErr: false, }, + { + name: "valid config with reader config", + config: kafka.Config{ + Topic: "abcd", + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("password"), + ReaderConfig: kafka.ClientConfig{ + Address: addr, + ClientID: "reader", + }, + }, + wantErr: false, + }, { name: "wrong password", config: kafka.Config{ diff --git a/pkg/kafka/client/writer_client.go b/pkg/kafka/client/writer_client.go index 53967de2e0689..eafb9970aface 100644 --- a/pkg/kafka/client/writer_client.go +++ b/pkg/kafka/client/writer_client.go @@ -41,8 +41,17 @@ func NewWriterClient(component string, kafkaCfg kafka.Config, maxInflightProduce // Do not export the client ID, because we use it to specify options to the backend. metrics := NewClientMetrics(component, reg, kafkaCfg.EnableKafkaHistograms) + address := kafkaCfg.Address + clientID := kafkaCfg.ClientID + if kafkaCfg.WriterConfig.Address != "" { + address = kafkaCfg.WriterConfig.Address + clientID = kafkaCfg.WriterConfig.ClientID + } + opts := append( commonKafkaClientOptions(kafkaCfg, metrics, logger), + kgo.ClientID(clientID), + kgo.SeedBrokers(address), kgo.RequiredAcks(kgo.AllISRAcks()), kgo.DefaultProduceTopic(kafkaCfg.Topic), @@ -155,8 +164,6 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { opts := []kgo.Opt{ - kgo.ClientID(cfg.ClientID), - kgo.SeedBrokers(cfg.Address), kgo.DialTimeout(cfg.DialTimeout), // A cluster metadata update is a request sent to a broker and getting back the map of partitions and diff --git a/pkg/kafka/client/writer_client_test.go b/pkg/kafka/client/writer_client_test.go index 681e7a82d3904..40e6cca9e8571 100644 --- a/pkg/kafka/client/writer_client_test.go +++ b/pkg/kafka/client/writer_client_test.go @@ -32,6 +32,18 @@ func TestNewWriterClient(t *testing.T) { }, wantErr: false, }, + { + name: "valid config with writer config", + config: kafka.Config{ + Topic: "abcd", + WriteTimeout: time.Second, + WriterConfig: kafka.ClientConfig{ + Address: addr, + ClientID: "writer", + }, + }, + wantErr: false, + }, { name: "wrong password", config: kafka.Config{ diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index f3bb037bc2c18..98735573ce7ec 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -28,6 +28,9 @@ const ( var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") + ErrAmbiguousKafkaAddress = errors.New("the Kafka address has been configured in both kafka.address and kafka.reader_config.address or kafka.writer_config.address") + ErrAmbiguousKafkaClientID = errors.New("the Kafka client ID has been configured in both kafka.client_id and kafka.reader_config.client_id or kafka.writer_config.client_id") + ErrMixingOldAndNewClientConfig = errors.New("mixing old and new client config is not allowed") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, MaxProducerRecordDataBytesLimit) @@ -35,12 +38,15 @@ var ( // Config holds the generic config for the Kafka backend. type Config struct { - Address string `yaml:"address"` + Address string `yaml:"address" doc:"hidden|deprecated"` Topic string `yaml:"topic"` - ClientID string `yaml:"client_id"` + ClientID string `yaml:"client_id" doc:"hidden|deprecated"` DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` + ReaderConfig ClientConfig `yaml:"reader_config"` + WriterConfig ClientConfig `yaml:"writer_config"` + SASLUsername string `yaml:"sasl_username"` SASLPassword flagext.Secret `yaml:"sasl_password"` @@ -60,14 +66,27 @@ type Config struct { EnableKafkaHistograms bool `yaml:"enable_kafka_histograms"` } +type ClientConfig struct { + Address string `yaml:"address"` + ClientID string `yaml:"client_id"` +} + +func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Address, prefix+".address", "", "The Kafka backend address.") + f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.") +} + func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("kafka", f) } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.Address, prefix+".address", "localhost:9092", "The Kafka backend address.") + cfg.ReaderConfig.RegisterFlagsWithPrefix(prefix+".reader", f) + cfg.WriterConfig.RegisterFlagsWithPrefix(prefix+".writer", f) + + f.StringVar(&cfg.Address, prefix+".address", "localhost:9092", "The Kafka backend address. This setting is deprecated and will be removed in the next minor release.") f.StringVar(&cfg.Topic, prefix+".topic", "", "The Kafka topic name.") - f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.") + f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID. This setting is deprecated and will be removed in the next minor release.") f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.") f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.") @@ -92,9 +111,33 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (cfg *Config) Validate() error { - if cfg.Address == "" { + if cfg.Address == "" && cfg.ReaderConfig.Address == "" && cfg.WriterConfig.Address == "" { return ErrMissingKafkaAddress } + if cfg.Address != "" && cfg.ReaderConfig.Address != "" { + return ErrAmbiguousKafkaAddress + } + if cfg.Address != "" && cfg.WriterConfig.Address != "" { + return ErrAmbiguousKafkaAddress + } + if cfg.ClientID != "" && cfg.ReaderConfig.ClientID != "" { + return ErrAmbiguousKafkaClientID + } + if cfg.ClientID != "" && cfg.WriterConfig.ClientID != "" { + return ErrAmbiguousKafkaClientID + } + if cfg.Address != "" && cfg.ReaderConfig.ClientID != "" { + return ErrMixingOldAndNewClientConfig + } + if cfg.Address != "" && cfg.WriterConfig.ClientID != "" { + return ErrMixingOldAndNewClientConfig + } + if cfg.ClientID != "" && cfg.ReaderConfig.Address != "" { + return ErrMixingOldAndNewClientConfig + } + if cfg.ClientID != "" && cfg.WriterConfig.Address != "" { + return ErrMixingOldAndNewClientConfig + } if cfg.Topic == "" { return ErrMissingKafkaTopic } diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go index 87c456f42adc0..2266d360a0da6 100644 --- a/pkg/kafka/config_test.go +++ b/pkg/kafka/config_test.go @@ -37,3 +37,60 @@ func TestBothSASLParamsMustBeSet(t *testing.T) { err = cfg.Validate() require.NoError(t, err) } + +func TestAmbiguousKafkaAddress(t *testing.T) { + cfg := Config{ + Address: "localhost:9092", + ReaderConfig: ClientConfig{Address: "localhost:9092"}, + WriterConfig: ClientConfig{Address: "localhost:9092"}, + } + err := cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrAmbiguousKafkaAddress) +} + +func TestAmbiguousKafkaClientID(t *testing.T) { + // Disallow defining in both places + cfg := Config{ + ClientID: "abcd", + ReaderConfig: ClientConfig{Address: "reader:9092", ClientID: "abcd"}, + WriterConfig: ClientConfig{Address: "writer:9092", ClientID: "abcd"}, + } + err := cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrAmbiguousKafkaClientID) +} + +func TestMixingOldAndNewClientConfig(t *testing.T) { + cfg := Config{ + Address: "localhost:9092", + ReaderConfig: ClientConfig{ClientID: "reader"}, + } + err := cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig) + + cfg = Config{ + Address: "localhost:9092", + WriterConfig: ClientConfig{ClientID: "reader"}, + } + err = cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig) + + cfg = Config{ + ClientID: "abcd", + ReaderConfig: ClientConfig{Address: "localhost:9092"}, + } + err = cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig) + + cfg = Config{ + ClientID: "abcd", + WriterConfig: ClientConfig{Address: "localhost:9092"}, + } + err = cfg.Validate() + require.Error(t, err) + require.ErrorIs(t, err, ErrMixingOldAndNewClientConfig) +} diff --git a/tools/deprecated-config-checker/deprecated-config.yaml b/tools/deprecated-config-checker/deprecated-config.yaml index 0e82f7f8b1f7c..00d2b22dd94d1 100644 --- a/tools/deprecated-config-checker/deprecated-config.yaml +++ b/tools/deprecated-config-checker/deprecated-config.yaml @@ -43,6 +43,10 @@ storage_config: chunk_store_config: write_dedupe_cache_config: "Write dedupe cache is deprecated along with deprecated index types. Consider using TSDB index which does not require a write dedupe cache." +kafka_config: + address: "Use reader_config.address or writer_config.address instead." + client_id: "Use reader_config.client_id or writer_config.client_id instead." + ## NOTE: This will also be used to validate per-tenant overrides. limits_config: unordered_writes: "Will be eventually removed."