Skip to content

Do not sort data that is already sorted #7162

@NGA-TRAN

Description

@NGA-TRAN

Is your feature request related to a problem or challenge?

I have data that is already sorted on the columns in order by but when I run the explain, I still see the Sort operator is in the plan.

Reproducer:

This is the content of the csv file name your_data.csv

1,50,1000
2,30,1000
3,100,500
4,120,400

Create the table:

CREATE EXTERNAL TABLE test5 (
    "key"  VARCHAR NOT NULL,
    "value"  INT NOT NULL,
    "time"  BIGINT NOT NULL
)
STORED AS CSV
WITH ORDER ("key" ASC, "time" ASC)
LOCATION 'path/to/your_data.csv';

This query has order by on the sorted key, time but there is still sort operator in the plan

-- Q1
explain
select "key", "time", "value"
from test5
where "key" = '2'
order by "key" ASC, "time" ASC
limit 1;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                        |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=1                                                                                                                                                                                                                                      |
|               |   Sort: test5.key ASC NULLS LAST, test5.time ASC NULLS LAST, fetch=1                                                                                                                                                                                        |
|               |     Projection: test5.key, test5.time, test5.value                                                                                                                                                                                                          |
|               |       Filter: test5.key = Utf8("2")                                                                                                                                                                                                                         |
|               |         TableScan: test5 projection=[key, value, time], partial_filters=[test5.key = Utf8("2")]                                                                                                                                                             |
| physical_plan | GlobalLimitExec: skip=0, fetch=1                                                                                                                                                                                                                            |
|               |   SortPreservingMergeExec: [key@0 ASC NULLS LAST,time@1 ASC NULLS LAST], fetch=1                                                                                                                                                                            |
|               |     ProjectionExec: expr=[key@0 as key, time@2 as time, value@1 as value]                                                                                                                                                                                   |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                           |
|               |         FilterExec: key@0 = 2                                                                                                                                                                                                                               |
|               |           RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1                                                                                                                                                                             |
|               |             CsvExec: file_groups={1 group: [[Users/hoabinhngatran/Documents/git/ngaPlaying/commands/arrow-datafusion/sort_order/data.csv]]}, projection=[key, value, time], output_ordering=[key@0 ASC NULLS LAST, time@2 ASC NULLS LAST], has_header=false |
|               |                                                                                                                                                                                                                                                             |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

As an advanced question, this query does not have the key column on the sort order but a filter on the key which I think can be optimized to eliminate to sort operator, too

-- Q2
explain
select "time", "value"
from test5
where "key" = '2'
order by "time" ASC
limit 1;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=1                                                                                                                                                                                                                                        |
|               |   Sort: test5.time ASC NULLS LAST, fetch=1                                                                                                                                                                                                                    |
|               |     Projection: test5.time, test5.value                                                                                                                                                                                                                       |
|               |       Filter: test5.key = Utf8("2")                                                                                                                                                                                                                           |
|               |         TableScan: test5 projection=[key, value, time], partial_filters=[test5.key = Utf8("2")]                                                                                                                                                               |
| physical_plan | GlobalLimitExec: skip=0, fetch=1                                                                                                                                                                                                                              |
|               |   SortPreservingMergeExec: [time@0 ASC NULLS LAST], fetch=1                                                                                                                                                                                                   |
|               |     SortExec: fetch=1, expr=[time@0 ASC NULLS LAST]                                                                                                                                                                                                           |
|               |       ProjectionExec: expr=[time@2 as time, value@1 as value]                                                                                                                                                                                                 |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                           |
|               |           FilterExec: key@0 = 2                                                                                                                                                                                                                               |
|               |             RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1                                                                                                                                                                             |
|               |               CsvExec: file_groups={1 group: [[Users/hoabinhngatran/Documents/git/ngaPlaying/commands/arrow-datafusion/sort_order/data.csv]]}, projection=[key, value, time], output_ordering=[key@0 ASC NULLS LAST, time@2 ASC NULLS LAST], has_header=false |
|               |                                                                                                                                                                                                                                                               |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I think Q2's plan can look like Q3 below (without sort)

---Q3
explain
select "time", "value"
from test5
where "key" = '2'
order by "time" ASC
limit 1;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=1                                                                                                                                                                                                                                        |
|               |   Sort: test5.time ASC NULLS LAST, fetch=1                                                                                                                                                                                                                    |
|               |     Projection: test5.time, test5.value                                                                                                                                                                                                                       |
|               |       Filter: test5.key = Utf8("2")                                                                                                                                                                                                                           |
|               |         TableScan: test5 projection=[key, value, time], partial_filters=[test5.key = Utf8("2")]                                                                                                                                                               |
| physical_plan | GlobalLimitExec: skip=0, fetch=1                                                                                                                                                                                                                              |
|               |   SortPreservingMergeExec: [time@0 ASC NULLS LAST], fetch=1                                                                                                                                                                                                   |
|               |     SortExec: fetch=1, expr=[time@0 ASC NULLS LAST]                                                                                                                                                                                                           |
|               |       ProjectionExec: expr=[time@2 as time, value@1 as value]                                                                                                                                                                                                 |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                           |
|               |           FilterExec: key@0 = 2                                                                                                                                                                                                                               |
|               |             RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1                                                                                                                                                                             |
|               |               CsvExec: file_groups={1 group: [[Users/hoabinhngatran/Documents/git/ngaPlaying/commands/arrow-datafusion/sort_order/data.csv]]}, projection=[key, value, time], output_ordering=[key@0 ASC NULLS LAST, time@2 ASC NULLS LAST], has_header=false |
|               |                                                                                                                                                                                                                                                               |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Describe the solution you'd like

Q1 and Q2 do not need sort operator but the plan knows data is really sorted

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions