Skip to content

Commit 672f3de

Browse files
authored
HBASE-27630: hbase-spark bulkload stage directory limited to hdfs only (#108)
Signed-off-by: Peter Somogyi <[email protected]> Reviewed-by: Istvan Toth <[email protected]>
1 parent c221896 commit 672f3de

File tree

2 files changed

+64
-5
lines changed

2 files changed

+64
-5
lines changed

spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ class HBaseContext(@transient val sc: SparkContext,
656656
hbaseForeachPartition(this, (it, conn) => {
657657

658658
val conf = broadcastedConf.value.value
659-
val fs = FileSystem.get(conf)
659+
val fs = new Path(stagingDir).getFileSystem(conf)
660660
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
661661
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
662662
var rollOverRequested = false
@@ -791,7 +791,7 @@ class HBaseContext(@transient val sc: SparkContext,
791791
hbaseForeachPartition(this, (it, conn) => {
792792

793793
val conf = broadcastedConf.value.value
794-
val fs = FileSystem.get(conf)
794+
val fs = new Path(stagingDir).getFileSystem(conf)
795795
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
796796
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
797797
var rollOverRequested = false
@@ -973,7 +973,7 @@ class HBaseContext(@transient val sc: SparkContext,
973973
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
974974
val familyDir = new Path(stagingDir, Bytes.toString(family))
975975

976-
fs.mkdirs(familyDir)
976+
familyDir.getFileSystem(conf).mkdirs(familyDir);
977977

978978
val loc:HRegionLocation = {
979979
try {

spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818
package org.apache.hadoop.hbase.spark
1919

2020
import org.apache.hadoop.fs.{FileSystem, Path}
21-
import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
21+
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get}
2222
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
2323
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
24-
import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
24+
import org.apache.hadoop.hbase.{CellUtil, HBaseTestingUtility, HConstants, TableName}
2525
import org.apache.hadoop.hbase.util.Bytes
2626
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
2727
import org.apache.spark.SparkContext
2828
import org.junit.rules.TemporaryFolder
2929
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
3030

31+
import java.io.File
32+
import java.net.URI
33+
import java.nio.file.Files
34+
3135
class BulkLoadSuite extends FunSuite with
3236
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
3337
@transient var sc: SparkContext = null
@@ -65,6 +69,61 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
6569
sc.stop()
6670
}
6771

72+
test ("Staging dir: Test usage of staging dir on a separate filesystem") {
73+
val config = TEST_UTIL.getConfiguration
74+
75+
logInfo(" - creating table " + tableName)
76+
TEST_UTIL.createTable(TableName.valueOf(tableName),
77+
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
78+
79+
// Test creates rdd with 2 column families and
80+
// write those to hfiles on local filesystem
81+
// using bulkLoad functionality. We don't check the load functionality
82+
// due the limitations of the HBase Minicluster
83+
84+
val rdd = sc.parallelize(Array(
85+
(Bytes.toBytes("1"),
86+
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
87+
(Bytes.toBytes("2"),
88+
(Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
89+
90+
val hbaseContext = new HBaseContext(sc, config)
91+
val uri = Files.createTempDirectory("tmpDirPrefix").toUri
92+
val stagingUri = new URI(uri + "staging_dir")
93+
val stagingFolder = new File(stagingUri)
94+
val fs = new Path(stagingUri.toString).getFileSystem(config)
95+
try {
96+
hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd,
97+
TableName.valueOf(tableName),
98+
t => {
99+
val rowKey = t._1
100+
val family: Array[Byte] = t._2._1
101+
val qualifier = t._2._2
102+
val value: Array[Byte] = t._2._3
103+
104+
val keyFamilyQualifier = new KeyFamilyQualifier(rowKey, family, qualifier)
105+
106+
Seq((keyFamilyQualifier, value)).iterator
107+
},
108+
stagingUri.toString)
109+
110+
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
111+
112+
} finally {
113+
val admin = ConnectionFactory.createConnection(config).getAdmin
114+
try {
115+
admin.disableTable(TableName.valueOf(tableName))
116+
admin.deleteTable(TableName.valueOf(tableName))
117+
} finally {
118+
admin.close()
119+
}
120+
fs.delete(new Path(stagingFolder.getPath), true)
121+
122+
testFolder.delete()
123+
124+
}
125+
}
126+
68127
test("Wide Row Bulk Load: Test multi family and multi column tests " +
69128
"with all default HFile Configs.") {
70129
val config = TEST_UTIL.getConfiguration

0 commit comments

Comments
 (0)