Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bulk_load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BulkLoad interface {

type LoadRunner struct {
DbName string
User string
Password string
BasicAuthentication string
Workers int
ItemLimit int64
BatchSize int
Expand Down Expand Up @@ -110,6 +113,9 @@ func (r *LoadRunner) notifyHandler(arg int) (int, error) {

func (r *LoadRunner) Init(defaultBatchSize int) {
flag.StringVar(&r.DbName, "db", "benchmark_db", "Database name.")
flag.StringVar(&r.User, "user", "", "User name, credentials as query parameters.")
flag.StringVar(&r.Password, "password", "", "User password, credentials as query parameters.")
flag.StringVar(&r.BasicAuthentication, "basic-authentication", "", "Authenticate with basic authentication. format [user:password].")
flag.IntVar(&r.BatchSize, "batch-size", defaultBatchSize, "Batch size (1 line of input = 1 item).")
flag.IntVar(&r.Workers, "workers", 1, "Number of parallel requests to make.")
flag.Int64Var(&r.ItemLimit, "item-limit", -1, "Number of items to read from stdin before quitting. (1 item per 1 line of input.)")
Expand Down
2 changes: 2 additions & 0 deletions bulk_query_gen/database_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ type DatabaseConfig map[string]string

const (
DatabaseName = "database-name"
UserName = "user-name"
Password = "password"
)
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_bareagg_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewInfluxBareAggregateQuery(agg Aggregate, lang Language, dbConfig bulkQuer
}

return &InfluxBareAggregateQuery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
aggregate: agg,
}
}
Expand Down
12 changes: 10 additions & 2 deletions bulk_query_gen/influxdb/influx_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ type InfluxCommon struct {
bulkQuerygen.CommonParams
language Language
DatabaseName string
UserName string
Password string
}

func newInfluxCommon(lang Language, dbName string, interval bulkQuerygen.TimeInterval, scaleVar int) *InfluxCommon {
func newInfluxCommon(lang Language, dbName string, userName string, password string, interval bulkQuerygen.TimeInterval, scaleVar int) *InfluxCommon {
return &InfluxCommon{
CommonParams: *bulkQuerygen.NewCommonParams(interval, scaleVar),
language: lang,
DatabaseName: dbName}
DatabaseName: dbName,
UserName: userName,
Password: password}
}

// getHttpQuery gets the right kind of http request based on the language being used
Expand All @@ -64,6 +68,10 @@ func (d *InfluxCommon) getHttpQuery(humanLabel, intervalStart, query string, q *
getValues := url.Values{}
getValues.Set("db", d.DatabaseName)
getValues.Set("q", query)
if len(d.UserName) != 0 && len(d.Password) != 0 {
getValues.Set("u", d.UserName)
getValues.Set("p", d.Password)
}
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", getValues.Encode()))
q.Body = nil
Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_dashboard_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func newInfluxDashboard(lang Language, dbConfig bulkQuerygen.DatabaseConfig, int
clustersCount = 1
}
return &InfluxDashboard{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], interval, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], interval, scaleVar),
ClustersCount: clustersCount,
TimeWindow: bulkQuerygen.TimeWindow{interval.Start, duration},
}
Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_devops_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func newInfluxDevopsCommon(lang Language, dbConfig bulkQuerygen.DatabaseConfig,
}

return &InfluxDevops{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], interval, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], interval, scaleVar),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewInfluxGroupWindowTransposeQuery(agg Aggregate, card Cardinality, lang La
}

return &InfluxGroupWindowTransposeQuery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
aggregate: agg,
interval: queryInterval,
cardinality: card,
Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_groupagg_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewInfluxGroupAggregateQuery(agg Aggregate, lang Language, dbConfig bulkQue
}

return &InfluxGroupAggregateQuery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
aggregate: agg,
}
}
Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_iot_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewInfluxIotCommon(lang Language, dbConfig bulkQuerygen.DatabaseConfig, que
}

return &InfluxIot{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
}
}

Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_metaquery_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewInfluxMetaqueryCommon(lang Language, dbConfig bulkQuerygen.DatabaseConfi
}

return &InfluxMetaquery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
}
}

Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_multi_measurement_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func InfluxMultiMeasurementCommon(lang Language, dbConfig bulkQuerygen.DatabaseC
}

return &InfluxMultiMeasurement{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
}
}

Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_ungroupedagg_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewInfluxUngroupedAggregateQuery(agg Aggregate, lang Language, dbConfig bul
}

return &InfluxUngroupedAggregateQuery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
aggregate: agg,
}
}
Expand Down
2 changes: 1 addition & 1 deletion bulk_query_gen/influxdb/influx_windowagg_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewInfluxWindowAggregateQuery(agg Aggregate, lang Language, dbConfig bulkQu
}

return &InfluxWindowAggregateQuery{
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], queriesFullRange, scaleVar),
InfluxCommon: *newInfluxCommon(lang, dbConfig[bulkQuerygen.DatabaseName], dbConfig[bulkQuerygen.UserName], dbConfig[bulkQuerygen.Password], queriesFullRange, scaleVar),
aggregate: agg,
interval: queryInterval,
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/bulk_load_influx/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"time"
"encoding/base64"

"github.com/valyala/fasthttp"
)
Expand All @@ -32,6 +33,10 @@ type HTTPWriterConfig struct {
// Name of the target database into which points will be written.
Database string

User string
Password string
BasicAuthentication string

// Id of the target bucket into which points will be written. (InfluxDB v2)
BucketId string

Expand Down Expand Up @@ -86,6 +91,9 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error)
if w.c.AuthToken != "" {
req.Header.Add("Authorization", fmt.Sprintf("Token %s", w.c.AuthToken))
}
if w.c.BasicAuthentication != "" {
req.Header.Add("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.c.BasicAuthentication))))
}
req.SetBody(body)

resp := fasthttp.AcquireResponse()
Expand Down
6 changes: 6 additions & 0 deletions cmd/bulk_load_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,16 @@ func (l *InfluxBulkLoad) PrepareProcess(i int) {
DebugInfo: fmt.Sprintf("worker #%d, dest url: %s", i, l.configs[i].url),
Host: l.configs[i].url,
Database: bulk_load.Runner.DbName,
User: bulk_load.Runner.User,
Password: bulk_load.Runner.Password,
BasicAuthentication: bulk_load.Runner.BasicAuthentication,
BackingOffChan: l.configs[i].backingOffChan,
BackingOffDone: l.configs[i].backingOffDone,
}
url = c.Host + "/write?consistency=" + l.consistency + "&db=" + neturl.QueryEscape(c.Database)
if len(c.User) != 0 && len(c.Password) != 0 {
url = url + "&u=" + neturl.QueryEscape(c.User) + "&p=" + neturl.QueryEscape(c.Password)
}
}
l.configs[i].writer = NewHTTPWriter(*c, url)
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/bulk_query_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ var (
queryCount int

dbName string // TODO(rw): make this a map[string]string -> DatabaseConfig
userName string
password string

timestampStartStr string
timestampEndStr string
Expand Down Expand Up @@ -475,6 +477,8 @@ func init() {
flag.IntVar(&scaleVar, "scale-var", 1, "Scaling variable (must be the equal to the scale-var used for data generation).")
flag.IntVar(&queryCount, "queries", 1000, "Number of queries to generate.")
flag.StringVar(&dbName, "db", "benchmark_db", "Database to use (ignored for ElasticSearch).")
flag.StringVar(&userName, "user", "", "User name, credentials as query parameters.")
flag.StringVar(&password, "password", "", "User password, credentials as query parameters.")

flag.StringVar(&timestampStartStr, "timestamp-start", common.DefaultDateTimeStart, "Beginning timestamp (RFC3339).")
flag.StringVar(&timestampEndStr, "timestamp-end", common.DefaultDateTimeEnd, "Ending timestamp (RFC3339).")
Expand Down Expand Up @@ -587,6 +591,8 @@ func main() {

dbConfig := bulkQueryGen.DatabaseConfig{
bulkQueryGen.DatabaseName: dbName,
bulkQueryGen.UserName: userName,
bulkQueryGen.Password: password,
}

// Make the query generator:
Expand Down