Skip to content

Commit 8a283bd

Browse files
committed
HBASE-26863 fix incorrect rowkey pushdown
1 parent 036729d commit 8a283bd

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -593,11 +593,11 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
593593

594594
isLowerBoundEqualTo = if (lowerBoundCompare == 0)
595595
isLowerBoundEqualTo && other.isLowerBoundEqualTo
596-
else isLowerBoundEqualTo
596+
else if (lowerBoundCompare < 0) other.isLowerBoundEqualTo else isLowerBoundEqualTo
597597

598598
isUpperBoundEqualTo = if (upperBoundCompare == 0)
599599
isUpperBoundEqualTo && other.isUpperBoundEqualTo
600-
else isUpperBoundEqualTo
600+
else if (upperBoundCompare < 0) isUpperBoundEqualTo else other.isUpperBoundEqualTo
601601
}
602602

603603
/**
@@ -643,7 +643,6 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
643643
* @return True is overlap false is not overlap
644644
*/
645645
def getOverLapScanRange(other:ScanRange): ScanRange = {
646-
647646
var leftRange:ScanRange = null
648647
var rightRange:ScanRange = null
649648

@@ -659,14 +658,9 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
659658
}
660659

661660
if (hasOverlap(leftRange, rightRange)) {
662-
// Find the upper bound and lower bound
663-
if (compareRange(leftRange.upperBound, rightRange.upperBound) >= 0) {
664-
new ScanRange(rightRange.upperBound, rightRange.isUpperBoundEqualTo,
665-
rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
666-
} else {
667-
new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo,
668-
rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
669-
}
661+
val result = new ScanRange(upperBound, isUpperBoundEqualTo, lowerBound, isLowerBoundEqualTo)
662+
result.mergeIntersect(other)
663+
result
670664
} else {
671665
null
672666
}

spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,26 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
611611
assert(scanRange1.isUpperBoundEqualTo)
612612
}
613613

614+
test("Test Rowkey And with complex logic (HBASE-26863)") {
615+
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
616+
"WHERE " +
617+
"( KEY_FIELD >= 'get1' AND KEY_FIELD <= 'get3' ) AND (A_FIELD = 'foo1' OR B_FIELD = '8')").take(10)
618+
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
619+
assert(results.length == 2)
620+
621+
assert(executionRules.dynamicLogicExpression.toExpressionString
622+
== "( ( ( KEY_FIELD isNotNull AND KEY_FIELD >= 0 ) AND KEY_FIELD <= 1 ) AND ( A_FIELD == 2 OR B_FIELD == 3 ) )")
623+
624+
assert(executionRules.rowKeyFilter.points.size == 0)
625+
assert(executionRules.rowKeyFilter.ranges.size == 1)
626+
627+
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
628+
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
629+
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
630+
assert(scanRange1.isLowerBoundEqualTo)
631+
assert(scanRange1.isUpperBoundEqualTo)
632+
}
633+
614634
test("Test table that doesn't exist") {
615635
val catalog = s"""{
616636
|"table":{"namespace":"default", "name":"t1NotThere"},

0 commit comments

Comments
 (0)