diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 890e67f8..a29ea34d 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -22,7 +22,7 @@ import java.util import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.io.compress.Compression @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType} -import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.util.{Bytes, RegionSplitter} import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil @@ -38,17 +38,20 @@ import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ import org.apache.hadoop.hbase.client._ + import scala.reflect.ClassTag import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, -TableInputFormat, IdentityTableMapper} +TableInputFormat, IdentityTableMapper, TableSnapshotInputFormat} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.spark.streaming.dstream.DStream import java.io._ + import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path} + import scala.collection.mutable /** @@ -463,6 +466,81 @@ class HBaseContext(@transient val sc: SparkContext, (r: (ImmutableBytesWritable, Result)) => r) } + /** + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @return New RDD with results from scan + */ + def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String): + RDD[(ImmutableBytesWritable, Result)] + = hbaseRDDForSnapshot(snapshotName, scans, restoreDir, null, 1) + + /** + * This function will use the native HBase TableSnapshotInputFormat with the + * given scan object to generate a new RDD + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @param splitAlgo SplitAlgorithm to be used when generating InputSplits + * @param numSplitsPerRegion how many input splits to generate per one region + * @return New RDD with results from scan + */ + def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String, + splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int): + RDD[(ImmutableBytesWritable, Result)] = { + hbaseRDDForSnapshot[(ImmutableBytesWritable, Result)]( + snapshotName, + scans, + restoreDir, + splitAlgo, + numSplitsPerRegion, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * This function will use the native HBase TableSnapshotInputFormat with the + * given scan object to generate a new RDD + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @param splitAlgo SplitAlgorithm to be used when generating InputSplits + * @param numSplitsPerRegion how many input splits to generate per one region + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDDForSnapshot[U: ClassTag](snapshotName: String, scans: Scan, restoreDir: String, + splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scans, + classOf[IdentityTableMapper], null, null, job, true, new Path(restoreDir), splitAlgo, numSplitsPerRegion) + + val jconf = new JobConf(job.getConfiguration) + SparkHadoopUtil.get.addCredentials(jconf) + new NewHBaseRDD(sc, + classOf[TableSnapshotInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result], + job.getConfiguration, + this).map(f) + } + /** * underlining wrapper all foreach functions in HBaseContext */