目录
- HBase简介
- HBase架构
- HBase特性
- HBase安装
- HBase常用命令
- Springboot整合HBase
参考
- https://geek-docs.com/hbase/hbase-tutorial/introduction-of-hbase.html
- https://juejin.cn/post/6844903777347043336
- https://geek-docs.com/hbase/hbase-tutorial/introduction-of-hbase.html
- https://zhuanlan.zhihu.com/p/560898786
- https://www.jianshu.com/p/4d94478d3578
- https://blog.csdn.net/m0_74433188/article/details/129698415
- https://github1s.com/xiaoxiamo/SpringBoot_HBase/blob/HEAD/pom.xml#L30-L36
HBase简介
HBase是一个分布式的、面向列的开源数据库。HBase是BigTable对应的NoSQL系统。
Google发表GFS、MapReduce、BigTable三篇论文,号称“三驾马车”,开启了大数据的时代。那和这“三驾马车”对应的有哪些开源产品呢?我们前面已经讲过了GFS对应的Hadoop分布式文件系统HDFS,以及MapReduce对应的Hadoop分布式计算框架MapReduce,BigTable对应的NoSQL系统HBase。
HBase架构
Zookeeper
- 实现了 HMaster 的高可用,保存了 HBase 的元数据信息,是所有 HBase 表的寻址入口
- 对 HMaster 和 HRegionServer 实现了监控
HMaster(Master)
- 为 HRegionServer 分配 Region
- 维护整个集群的负载均衡
- 发现失效的 Region,并将失效的 Region 分配到正常的 HRegionServer 上
HRegionServer(RegionServer)
- 负责管理 Region
- 接受客户端的读写数据请求
- 切分在运行过程中变大的 Region
Region
- 每个 HRegion 由多个 Store 构成
- 每个 Store 保存一个列族,表有几个列族,则有几个 Store
- 每个 Store 由一个 MemStore 和多个 StoreFile 组成,MemStore 是 Store 在内存中的内容,写到文件后就是 StoreFile。StoreFile 底层就是以 HFile 的格式保存。
数据请求过程:
可伸缩
HBase为可伸缩海量数据储存而设计,实现面向在线业务的实时数据访问延迟。HBase的伸缩性主要依赖其可分裂的HRegion及可伸缩的分布式文件系统HDFS实现。
HRegion是HBase负责数据存储的主要进程,应用程序对数据的读写操作都是通过和HRegion通信完成。上面是HBase架构图,我们可以看到在HBase中,数据以HRegion为单位进行管理,也就是说应用程序如果想要访问一个数据,必须先找到HRegion,然后将数据读写操作提交给HRegion,由 HRegion完成存储层面的数据操作。
HRegionServer是物理服务器,每个HRegionServer上可以启动多个HRegion实例。当一个 HRegion中写入的数据太多,达到配置的阈值时,一个HRegion会分裂成两个HRegion,并将HRegion在整个集群中进行迁移,以使HRegionServer的负载均衡。
每个HRegion中存储一段Key值区间[key1, key2)
的数据,所有HRegion的信息,包括存储的Key值区间、所在HRegionServer地址、访问端口号等,都记录在HMaster服务器上。为了保证HMaster的高可用,HBase会启动多个HMaster,并通过ZooKeeper选举出一个主服务器。
可扩展数据模型
用 Mysql 存储是这样的:
id | name | age | salary | job |
---|---|---|---|---|
1 | 小明 | 23 | 学生 | |
2 | 小红 | 1000 | 律师 |
如果是 HBase 的话,存储是类似这样列式存储的:
rowkey名称 | 列存储 |
---|---|
rowkey1 | name:小明 |
rowkey1 | age:23 |
rowkey1 | job:学生 |
rowkey2 | name:小红 |
rowkey2 | salary:1000 |
rowkey2 | job:律师 |
- Table:Hbase的table由多个行组成
# 创建一个test的表,创建一个列簇为cf
hbase(main):003:0> create 'test', 'cf'
- Row:一个行在Hbase中由一个或多个有值的列组成。Row按照字母进行排序,因此行健的设计非常重要。
Column:列由列簇加上列的标识组成,一般是“列簇:列标识”,创建表的时候不用指定列标识
Column Family:列簇在物理上包含了许多的列与列的值,每个列簇都有一些存储的属性可配置。例如是否使用缓存,压缩类型,存储版本数等。也是存储的基本单元。
Column Qualifier:列簇的限定词,理解为列的唯一标识。但是列标识是可以改变的,因此每一行可能有不同的列标识
Cell:Cell是
{RowKey, ColumnFamily, Version}
唯一确定的单元,一般表达某个值的版本Timestamp:时间戳一般写在value的旁边,代表某个值的版本号,默认的时间戳是你写入数据的那一刻,但是你也可以在写入数据的时候指定不同的时间戳
Hbase在存储数据的时候,有两个SortedMap,首先按照rowkey进行字典排序,然后再对Column进行字典排序。
高性能存储
传统的机械式磁盘的访问特性是连续读写很快,随机读写很慢。这是因为机械磁盘靠电机驱动访问磁盘上的数据,电机要将磁头落到数据所在的磁道上,这个过程需要较长的寻址时间。如果数据不连续存储,磁头就要不停的移动,浪费了大量的时间。
为了提高数据写入速度,HBase使用了一种叫作LSM树的数据结构进行数据存储。LSM树的全名是Log Structed Merge Tree,翻译过来就是Log结构合并树。数据写入的时候以Log方式连续写入,然后异步对磁盘上的多个LSM树进行合并。
LSM树可以看作是一个N阶合并树。
数据写操作(包括插入、修改、删除)都在内存中进行,并且都会创建一个新记录(修改会记录新的数据值,而删除会记录一个删除标志)。这些数据在内存中仍然还是一棵排序树,当数据量超过设定的内存阈值后,会将这棵排序树和磁盘上最新的排序树合并。当这棵排序树的数据量也超过设定阈值后,会和磁盘上下一级的排序树合并。合并过程中,会用最新更新的数据覆盖旧的数据(或者记录为不同版本)。
在需要进行读操作时,总是从内存中的排序树开始搜索,如果没有找到,就从磁盘 上的排序树顺序查找。
在LSM树上进行一次数据更新不需要磁盘访问,在内存即可完成。当数据访问以写操作为主,而读操作则集中在最近写入的数据上时,使用LSM树可以极大程度地减少磁盘的访问次数,加快访问速度。
HBase特性
强读写一致,但是不是“最终一致性”的数据存储,这使得它非常适合高速的计算聚合
自动分片,通过Region分散在集群中,当行数增长的时候,Region也会自动的切分和再分配
自动的故障转移
Hadoop/HDFS集成,和HDFS开箱即用,不用太麻烦的衔接
丰富的“简洁,高效”API,Thrift/REST API,Java API
块缓存,布隆过滤器,可以高效的列查询优化
操作管理,Hbase提供了内置的web界面来操作,还可以监控JMX指标
HBase安装
HBase有3种部署模式
- 单机模式。单机安装部署HBase包即可,使用本地文件目录方式存储数据,内置ZooKeeper启动。
- 伪分布式模式。单机需要安装部署HBase、ZooKeeper和Hadoop,利用Hadoop的HDFS方式存储数据,外置ZooKeeper启动。
- 分布式模式。多机部署,生产环境。
这里使用MacOS安装单机模式的HBase,用作测试。
下载HBase安装包,解压
配置HBase环境变量
在
.bash_profile
文件添加环境变量export HBASE_HOME=/Users/qianhao/Applications/HBase/hbase-2.4.17 export PATH=$PATH:$HBASE_HOME/bin
当前shell下配置生效
source ~/.bash_profile
查看当前HBase版本
hbase version
配置
hbase-env.sh
文件# 指定java_home目录 export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_333.jdk/Contents/Home
配置
hbase-site.xml
文件<property> <name>hbase.cluster.distributed</name> <value>false</value> </property> <property> <name>hbase.tmp.dir</name> <value>./tmp</value> </property> <property> <name>hbase.unsafe.stream.capability.enforce</name> <value>false</value> </property> <!--数据存储目录--> <property> <name>hbase.rootdir</name> <value>file:///Users/qianhao/Applications/HBase/data</value> </property>
启动HBase
# 启动进程 qianhao@qianhaodeMacBook-Pro.local:bin $ sh start-hbase.sh running master, logging to /Users/qianhao/Applications/HBase/hbase-2.4.17/logs/hbase-qianhao-master-qianhaodeMacBook-Pro.local.out # 查看进程 有HMaster表示启动成功 qianhao@qianhaodeMacBook-Pro.local:bin $ jps 35993 HMaster 36075 Jps qianhao@qianhaodeMacBook-Pro.local:bin $
停止HBase
sh stop-hbase.sh
HBase常用命令
$ hbase shell #启动HBase Shell
#创建表
> create 'student', 'description', 'course' #创建表名为student的表, 指明两个列名, 分别为description和course
#信息明细
> list 'student' #列出list表信息
#插入数据
> put 'student', 'row1', 'description:age', '18' #意思为在student表row1处插入description:age的数据为18
> put 'student', 'row1', 'description:name', 'liu'
> put 'student', 'row1', 'course:chinese', '100'
#读取一行数据
> get 'student', 'row1'
#一次扫描所有数据
> scan 'student
#使表失效 / 有效
> disable 'student'
> enable 'student'
#删除表(要先disable)
> drop 'student'
#退出shell
> quit
Springboot整合HBase
依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring boot test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
</dependencies>
HBase封装类
public class HBaseService {
private Logger log = LoggerFactory.getLogger(HBaseService.class);
/**
* 管理员可以做表以及数据的增删改查功能
*/
private Admin admin = null;
private Connection connection = null;
public HBaseService() {
}
public HBaseService(Configuration conf) {
try {
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
} catch (IOException e) {
log.error("获取HBase连接失败!");
}
}
/**
* 创建表 create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>}
*
* @param tableName
* @param columnFamily
* @return 是否创建成功
*/
public boolean creatTable(String tableName, List<String> columnFamily) {
try {
//列族column family
List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
columnFamily.forEach(cf -> {
cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
Bytes.toBytes(cf)).build());
});
//表 table
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfDesc).build();
if (admin.tableExists(TableName.valueOf(tableName))) {
log.debug("table Exists!");
} else {
admin.createTable(tableDesc);
log.debug("create table Success!");
}
} catch (IOException e) {
log.error(MessageFormat.format("创建表{0}失败", tableName), e);
return false;
} finally {
close(admin, null, null);
}
return true;
}
/**
* 查询所有表的表名
*
* @return 表名集合
*/
public List<String> getAllTableNames() {
List<String> result = new ArrayList<>();
try {
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
result.add(tableName.getNameAsString());
}
} catch (IOException e) {
log.error("获取所有表的表名失败", e);
} finally {
close(admin, null, null);
}
return result;
}
/**
* 根据rowKey获取值
* @param tableName
* @param rowKey
* @return
*/
public Map<String, Map<String, String>> getResult(String tableName, String rowKey){
if(StringUtils.isBlank(tableName) || StringUtils.isBlank(rowKey)){
return null;
}
Map<String, Map<String, String>> map = new HashMap<>();
Result result = null;
Table table = null;
try{
table = getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
// get.addColumn(Bytes.toBytes("test_colums"), Bytes.toBytes("age"));
result = table.get(get);
Map<String, String> columnMap = new HashMap<>();
for(Cell cell : result.listCells()){
columnMap.put(
//列限定符
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
//列族
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
);
}
map.put(rowKey, columnMap);
}catch (Exception e){
log.error(MessageFormat.format("查询指定表中的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e);
}finally {
close(null, table);
}
return map;
}
/**
* 遍历查询指定表中的所有数据
*
* @param tableName
* @return
*/
public Map<String, Map<String, String>> getResultScanner(String tableName) {
Scan scan = new Scan();
return this.queryData(tableName, scan);
}
/**
* 通过表名及过滤条件查询数据
*
* @param tableName
* @param scan
* @return
*/
private Map<String, Map<String, String>> queryData(String tableName, Scan scan) {
// <rowKey,对应的行数据>
Map<String, Map<String, String>> result = new HashMap<>();
ResultScanner rs = null;
//获取表
Table table = null;
try {
table = getTable(tableName);
rs = table.getScanner(scan);
for (Result r : rs) {
// 每一行数据
Map<String, String> columnMap = new HashMap<>();
String rowKey = null;
// 行键,列族和列限定符一起确定一个单元(Cell)
for (Cell cell : r.listCells()) {
if (rowKey == null) {
rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
columnMap.put(
//列限定符
Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
//列族
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
if (rowKey != null) {
result.put(rowKey, columnMap);
}
}
} catch (IOException e) {
log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", tableName), e);
} finally {
close(null, rs, table);
}
return result;
}
/**
* 为表添加或者更新数据
*
* @param tableName
* @param rowKey
* @param familyName
* @param columns
* @param values
*/
public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {
Table table = null;
try {
table = getTable(tableName);
putData(table, rowKey, tableName, familyName, columns, values);
} catch (Exception e) {
log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}", tableName, rowKey, familyName), e);
} finally {
close(null, null, table);
}
}
private void putData(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) {
try {
//设置rowkey
Put put = new Put(Bytes.toBytes(rowKey));
if (columns != null && values != null && columns.length == values.length) {
for (int i = 0; i < columns.length; i++) {
if (columns[i] != null && values[i] != null) {
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
} else {
throw new NullPointerException(MessageFormat.format(
"列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
}
}
}
table.put(put);
log.debug("putData add or update data Success,rowKey:" + rowKey);
table.close();
} catch (Exception e) {
log.error(MessageFormat.format(
"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
tableName, rowKey, familyName), e);
}
}
/**
* 根据表名获取table
*
* @param tableName
* @return
* @throws IOException
*/
private Table getTable(String tableName) throws IOException {
return connection.getTable(TableName.valueOf(tableName));
}
/**
* 关闭流
*
* @param admin
* @param rs
* @param table
*/
private void close(Admin admin, ResultScanner rs, Table table) {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
log.error("关闭Admin失败", e);
}
if (rs != null) {
rs.close();
}
if (table != null) {
rs.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
log.error("关闭Table失败", e);
}
}
}
}
/**
* 关闭流
*
* @param admin
* @param table
*/
private void close(Admin admin, Table table) {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
log.error("关闭Admin失败", e);
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
log.error("关闭Table失败", e);
}
}
}
}
}
配置类
@Configuration
public class HBaseConfig {
@Bean
public HBaseService getHBaseService(){
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.cluster.distributed", "false");
conf.set("hbase.rootdir", "/Users/qianhao/Applications/HBase/data");
return new HBaseService(conf);
}
}
Demo
@SpringBootTest
public class HBaseTest {
@Autowired
private HBaseService hbaseService;
//测试创建表
@Test
public void testCreateTable() {
hbaseService.creatTable("test_base", Arrays.asList("a", "back"));
}
/**
* 查询hbase中的所有表
*/
@Test
public void test_getAllTable() {
List<String> allTableNames = hbaseService.getAllTableNames();
System.out.println(allTableNames);
}
//测试加入数据
@Test
public void testPutData() {
hbaseService.putData("test_base", "000001", "a", new String[]{"project_id",
"varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "mob_3", "0.9416",
"0.0000", "12.2293", "null"});
hbaseService.putData("test_base", "000002", "a", new String[]{"project_id",
"varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
"0.0000", "9.8679", "null"});
hbaseService.putData("test_base", "000003", "a", new String[]{"project_id",
"varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "education", "0.8984",
"0.0000", "25.5649", "null"});
}
/**
* 测试安装rowKey查询数据
*/
@Test
public void test_getData() {
Map<String, Map<String, String>> result = hbaseService.getResult("test_table", "row1");
System.out.println(result);
}
//测试遍历全表
@Test
public void testGetResultScanner() {
Map<String, Map<String, String>> result2 = hbaseService.getResultScanner("test_base");
System.out.println("-----遍历查询全表内容-----");
result2.forEach((k, value) -> {
System.out.println(k + "--->" + value);
});
}
}