Skip to content

Commit 16ccf9b

Browse files
authored
chore(engine): Filter by unknown column (#18131)
The new engine fails to execute a query when an `arrow.Record` (batch) does not contain the column on which a label filter is used, e.g. `| foo="bar"` where `foo` is neither a label nor a structured metadata. This change also fixed an incorrect conversion of the label filter operator `=`, which was converted to `MATCH_STR`, but requires to be `EQ`. The new engine reflects current behaviour of a label filter `| foo=""` where `foo` is undefined. Signed-off-by: Christian Haudum <[email protected]>
1 parent ba9b93b commit 16ccf9b

File tree

8 files changed

+50
-41
lines changed

8 files changed

+50
-41
lines changed

pkg/engine/executor/dataobjscan.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,6 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
406406
return nil, fmt.Errorf("invalid column expression type %T", column)
407407
}
408408

409-
md := arrow.MetadataFrom(map[string]string{
410-
types.MetadataKeyColumnType: columnExpr.Ref.Type.String(),
411-
})
412-
413409
switch columnExpr.Ref.Type {
414410
case types.ColumnTypeLabel:
415411
// TODO(rfratto): Switch to dictionary encoding for labels.
@@ -424,9 +420,10 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
424420
//
425421
// We skipped dictionary encoding for now to get the initial prototype
426422
// working.
423+
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
427424
addField(arrow.Field{
428425
Name: columnExpr.Ref.Column,
429-
Type: arrow.BinaryTypes.String,
426+
Type: ty,
430427
Nullable: true,
431428
Metadata: md,
432429
})
@@ -436,15 +433,16 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
436433
// has unconstrained cardinality. Using dictionary encoding would require
437434
// tracking every encoded value in the record, which is likely to be too
438435
// expensive.
436+
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
439437
addField(arrow.Field{
440438
Name: columnExpr.Ref.Column,
441-
Type: arrow.BinaryTypes.String,
439+
Type: ty,
442440
Nullable: true,
443441
Metadata: md,
444442
})
445443

446444
case types.ColumnTypeBuiltin:
447-
ty, md := builtinColumnType(columnExpr.Ref)
445+
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
448446
addField(arrow.Field{
449447
Name: columnExpr.Ref.Column,
450448
Type: ty,
@@ -472,36 +470,36 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
472470
Name: columnExpr.Ref.Column,
473471
Type: arrow.BinaryTypes.String,
474472
Nullable: true,
475-
Metadata: arrow.MetadataFrom(map[string]string{types.MetadataKeyColumnType: types.ColumnTypeLabel.String()}),
473+
Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String),
476474
})
477475
addField(arrow.Field{
478476
Name: columnExpr.Ref.Column,
479477
Type: arrow.BinaryTypes.String,
480478
Nullable: true,
481-
Metadata: arrow.MetadataFrom(map[string]string{types.MetadataKeyColumnType: types.ColumnTypeMetadata.String()}),
479+
Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String),
482480
})
483481

484-
case types.ColumnTypeParsed:
482+
case types.ColumnTypeParsed, types.ColumnTypeGenerated:
485483
return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type)
486484
}
487485
}
488486

489487
return arrow.NewSchema(fields, nil), nil
490488
}
491489

492-
func builtinColumnType(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
493-
if ref.Type != types.ColumnTypeBuiltin {
494-
panic("builtinColumnType called with a non-builtin column")
490+
func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
491+
if ref.Type == types.ColumnTypeBuiltin {
492+
switch ref.Column {
493+
case types.ColumnNameBuiltinTimestamp:
494+
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
495+
case types.ColumnNameBuiltinMessage:
496+
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
497+
default:
498+
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
499+
}
495500
}
496501

497-
switch ref.Column {
498-
case types.ColumnNameBuiltinTimestamp:
499-
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
500-
case types.ColumnNameBuiltinMessage:
501-
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
502-
default:
503-
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
504-
}
502+
return arrow.BinaryTypes.String, datatype.ColumnMetadata(ref.Type, datatype.String)
505503
}
506504

507505
// appendToBuilder appends a the provided field from record into the given

pkg/engine/executor/dataobjscan_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,10 @@ import (
2020
)
2121

2222
var (
23-
labelMD = buildMetadata(types.ColumnTypeLabel)
24-
metadataMD = buildMetadata(types.ColumnTypeMetadata)
23+
labelMD = datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)
24+
metadataMD = datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)
2525
)
2626

27-
func buildMetadata(ty types.ColumnType) arrow.Metadata {
28-
return arrow.MetadataFrom(map[string]string{
29-
types.MetadataKeyColumnType: ty.String(),
30-
})
31-
}
32-
3327
func Test_dataobjScan(t *testing.T) {
3428
obj := buildDataobj(t, []logproto.Stream{
3529
{

pkg/engine/executor/expressions.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/apache/arrow-go/v18/arrow/memory"
99

1010
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
11-
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
1211
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1312
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
1413
)
@@ -46,7 +45,13 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
4645
}, nil
4746
}
4847
}
49-
return nil, fmt.Errorf("unknown column %s: %w", expr.Ref.String(), errors.ErrKey)
48+
// A non-existent column is represented as a string scalar with zero-value.
49+
// This reflects current behaviour, where a label filter `| foo=""` would match all if `foo` is not defined.
50+
return &Scalar{
51+
value: datatype.NewStringLiteral(""),
52+
rows: input.NumRows(),
53+
ct: types.ColumnTypeGenerated,
54+
}, nil
5055

5156
case *physical.UnaryExpr:
5257
lhr, err := e.eval(expr.Left, input)

pkg/engine/executor/expressions_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/stretchr/testify/require"
1111

1212
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
13-
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
1413
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1514
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
1615
)
@@ -102,7 +101,7 @@ func TestEvaluateLiteralExpression(t *testing.T) {
102101
func TestEvaluateColumnExpression(t *testing.T) {
103102
e := expressionEvaluator{}
104103

105-
t.Run("invalid", func(t *testing.T) {
104+
t.Run("unknown column", func(t *testing.T) {
106105
colExpr := &physical.ColumnExpr{
107106
Ref: types.ColumnRef{
108107
Column: "does_not_exist",
@@ -112,8 +111,12 @@ func TestEvaluateColumnExpression(t *testing.T) {
112111

113112
n := len(words)
114113
rec := batch(n, time.Now())
115-
_, err := e.eval(colExpr, rec)
116-
require.ErrorContains(t, err, errors.ErrKey.Error())
114+
colVec, err := e.eval(colExpr, rec)
115+
require.NoError(t, err)
116+
117+
_, ok := colVec.(*Scalar)
118+
require.True(t, ok, "expected column vector to be a *Scalar, got %T", colVec)
119+
require.Equal(t, arrow.STRING, colVec.Type().ArrowType().ID())
117120
})
118121

119122
t.Run("string(message)", func(t *testing.T) {

pkg/engine/executor/filter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
8686
additions := make([]func(int), len(fields))
8787

8888
for i, field := range fields {
89+
8990
switch field.Type.ID() {
9091
case arrow.BOOL:
9192
builder := array.NewBooleanBuilder(mem)
@@ -127,6 +128,14 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
127128
builder.Append(src.Value(offset))
128129
}
129130

131+
case arrow.TIMESTAMP:
132+
builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
133+
builders[i] = builder
134+
additions[i] = func(offset int) {
135+
src := batch.Column(i).(*array.Timestamp)
136+
builder.Append(src.Value(offset))
137+
}
138+
130139
default:
131140
panic(fmt.Sprintf("unimplemented type in filterBatch: %s", field.Type.Name()))
132141
}

pkg/engine/executor/sortmerge_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestSortMerge(t *testing.T) {
2424
merge := &physical.SortMerge{
2525
Column: &physical.ColumnExpr{
2626
Ref: types.ColumnRef{
27-
Column: "invalid",
27+
Column: "not_a_timestamp_column",
2828
Type: types.ColumnTypeBuiltin,
2929
},
3030
},
@@ -40,7 +40,7 @@ func TestSortMerge(t *testing.T) {
4040
require.NoError(t, err)
4141

4242
err = pipeline.Read()
43-
require.ErrorContains(t, err, "key error")
43+
require.ErrorContains(t, err, "column is not a timestamp column")
4444
})
4545

4646
t.Run("ascending timestamp", func(t *testing.T) {

pkg/engine/planner/logical/planner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ func lineColumnRef() *ColumnRef {
182182
func convertLabelMatchType(op labels.MatchType) types.BinaryOp {
183183
switch op {
184184
case labels.MatchEqual:
185-
return types.BinaryOpMatchSubstr
185+
return types.BinaryOpEq
186186
case labels.MatchNotEqual:
187-
return types.BinaryOpNotMatchSubstr
187+
return types.BinaryOpNeq
188188
case labels.MatchRegexp:
189189
return types.BinaryOpMatchRe
190190
case labels.MatchNotRegexp:

pkg/engine/planner/logical/planner_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ func TestConvertAST_Success(t *testing.T) {
9898
%7 = SELECT %5 [predicate=%6]
9999
%8 = LT builtin.timestamp 1970-01-01T02:00:00Z
100100
%9 = SELECT %7 [predicate=%8]
101-
%10 = MATCH_STR ambiguous.foo "bar"
102-
%11 = MATCH_STR ambiguous.bar "baz"
101+
%10 = EQ ambiguous.foo "bar"
102+
%11 = EQ ambiguous.bar "baz"
103103
%12 = OR %10 %11
104104
%13 = SELECT %9 [predicate=%12]
105105
%14 = MATCH_STR builtin.message "metric.go"

0 commit comments

Comments
 (0)