-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathHBase_Read_Write
More file actions
140 lines (111 loc) · 5.07 KB
/
HBase_Read_Write
File metadata and controls
140 lines (111 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// HBase: Google Product, a very big table, above HDFS in architecture
/* HBase component:
1. row key
2. column group <includes 1 or more column names>
3. column name <one table can have 1 or more column groups>
4. unit <row key -> column group -> column name locates a unit, a unit can contain multiple values>
5. timestamp <why need timestamp? Because HBase is based on HDFS, HDFS only allow one time write and read multiple times, if user wants to modify a value in an unit, HBase will direct to new values but also keep the old value>
*/
// start HBase
./bin/start-hbase.sh
// start HBase shell
./bin/hbase shell
// if any tables existed in HBase, do below command to remove it
hbase > disable 'student'
hbase > drop 'student'
// Create a table in HBase
/*
1. create '<table name>', '<column group>'
2. put '<table name>', '<row key>', '<column group>:<column name>', '<column value>'
*/
hbase > create 'student', 'info'
hbase > put 'student', '1', 'info:name','Simon'
hbase > put 'student', '1', 'info:gender','M'
hbase > put 'student', '1', 'info:age','27'
hbase > put 'student', '2', 'info:name','Jane'
hbase > put 'student', '2', 'info:gender','F'
hbase > put 'student', '2', 'info:age','20'
// Read data from HBase
/*
Copy some jars under HBase/lib to /usr/local/spark/jars, file including all jars that name starts with hbase*, guava-21.0.1.jar, htrace-core-3.1.0-incubating.jar, protobuf-java-2.5.0.jar
*/
SparkOperateHBase.scala =>
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf())
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
// Use sbt to package
simple.sbt =>
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.5"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.5"
// Run spark-submit command
$ /usr/local/spark/bin/spark-submit \
> --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf \
> --class "SparkOperateHBase" \
> /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
必须使用“--driver-class-path”参数指定依赖JAR包的路径,而且必须把“/usr/local/hbase/conf”也加到路径中
// Result
Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24
// Write data into HBase
SparkWriteHBase.scala =>
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0))) //行健的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}
// Run spark-submit
$ /usr/local/spark/bin/spark-submit \
>--driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf \
>--class "SparkWriteHBase" \
>/usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
// check table info
hbase > scan 'student'