HBase入门


目录

  1. HBase简介
  2. HBase架构
  3. HBase特性
  4. HBase安装
  5. HBase常用命令
  6. Springboot整合HBase

参考

HBase简介

HBase是一个分布式的、面向列的开源数据库。HBase是BigTable对应的NoSQL系统。

Google发表GFS、MapReduce、BigTable三篇论文,号称“三驾马车”,开启了大数据的时代。那和这“三驾马车”对应的有哪些开源产品呢?我们前面已经讲过了GFS对应的Hadoop分布式文件系统HDFS,以及MapReduce对应的Hadoop分布式计算框架MapReduce,BigTable对应的NoSQL系统HBase。

HBase架构

  • Zookeeper

    • 实现了 HMaster 的高可用,保存了 HBase 的元数据信息,是所有 HBase 表的寻址入口
    • 对 HMaster 和 HRegionServer 实现了监控
  • HMaster(Master)

    • 为 HRegionServer 分配 Region
    • 维护整个集群的负载均衡
    • 发现失效的 Region,并将失效的 Region 分配到正常的 HRegionServer 上
  • HRegionServer(RegionServer)

    • 负责管理 Region
    • 接受客户端的读写数据请求
    • 切分在运行过程中变大的 Region
  • Region

    • 每个 HRegion 由多个 Store 构成
    • 每个 Store 保存一个列族,表有几个列族,则有几个 Store
    • 每个 Store 由一个 MemStore 和多个 StoreFile 组成,MemStore 是 Store 在内存中的内容,写到文件后就是 StoreFile。StoreFile 底层就是以 HFile 的格式保存。

数据请求过程:

可伸缩

HBase为可伸缩海量数据储存而设计,实现面向在线业务的实时数据访问延迟。HBase的伸缩性主要依赖其可分裂的HRegion可伸缩的分布式文件系统HDFS实现。

HRegion是HBase负责数据存储的主要进程,应用程序对数据的读写操作都是通过和HRegion通信完成。上面是HBase架构图,我们可以看到在HBase中,数据以HRegion为单位进行管理,也就是说应用程序如果想要访问一个数据,必须先找到HRegion,然后将数据读写操作提交给HRegion,由 HRegion完成存储层面的数据操作。

HRegionServer是物理服务器,每个HRegionServer上可以启动多个HRegion实例。当一个 HRegion中写入的数据太多,达到配置的阈值时,一个HRegion会分裂成两个HRegion,并将HRegion在整个集群中进行迁移,以使HRegionServer的负载均衡。

每个HRegion中存储一段Key值区间[key1, key2)的数据,所有HRegion的信息,包括存储的Key值区间、所在HRegionServer地址、访问端口号等,都记录在HMaster服务器上。为了保证HMaster的高可用,HBase会启动多个HMaster,并通过ZooKeeper选举出一个主服务器。

可扩展数据模型

用 Mysql 存储是这样的:

id name age salary job
1 小明 23 学生
2 小红 1000 律师

如果是 HBase 的话,存储是类似这样列式存储的:

rowkey名称 列存储
rowkey1 name:小明
rowkey1 age:23
rowkey1 job:学生
rowkey2 name:小红
rowkey2 salary:1000
rowkey2 job:律师
  • Table:Hbase的table由多个行组成
# 创建一个test的表,创建一个列簇为cf
hbase(main):003:0> create 'test', 'cf' 
  • Row:一个行在Hbase中由一个或多个有值的列组成。Row按照字母进行排序,因此行健的设计非常重要。
写数据
  • Column:列由列簇加上列的标识组成,一般是“列簇:列标识”,创建表的时候不用指定列标识

  • Column Family:列簇在物理上包含了许多的列与列的值,每个列簇都有一些存储的属性可配置。例如是否使用缓存,压缩类型,存储版本数等。也是存储的基本单元

  • Column Qualifier:列簇的限定词,理解为列的唯一标识。但是列标识是可以改变的,因此每一行可能有不同的列标识

查看表中数据
  • Cell:Cell是{RowKey, ColumnFamily, Version} 唯一确定的单元,一般表达某个值的版本

  • Timestamp:时间戳一般写在value的旁边,代表某个值的版本号,默认的时间戳是你写入数据的那一刻,但是你也可以在写入数据的时候指定不同的时间戳

单行数据

Hbase在存储数据的时候,有两个SortedMap,首先按照rowkey进行字典排序,然后再对Column进行字典排序。

存储排序

高性能存储

传统的机械式磁盘的访问特性是连续读写很快,随机读写很慢。这是因为机械磁盘靠电机驱动访问磁盘上的数据,电机要将磁头落到数据所在的磁道上,这个过程需要较长的寻址时间。如果数据不连续存储,磁头就要不停的移动,浪费了大量的时间。

为了提高数据写入速度,HBase使用了一种叫作LSM树的数据结构进行数据存储。LSM树的全名是Log Structed Merge Tree,翻译过来就是Log结构合并树。数据写入的时候以Log方式连续写入,然后异步对磁盘上的多个LSM树进行合并

HBase 简介

LSM树可以看作是一个N阶合并树。

数据写操作(包括插入、修改、删除)都在内存中进行,并且都会创建一个新记录(修改会记录新的数据值,而删除会记录一个删除标志)。这些数据在内存中仍然还是一棵排序树,当数据量超过设定的内存阈值后,会将这棵排序树和磁盘上最新的排序树合并。当这棵排序树的数据量也超过设定阈值后,会和磁盘上下一级的排序树合并。合并过程中,会用最新更新的数据覆盖旧的数据(或者记录为不同版本)。

在需要进行读操作时,总是从内存中的排序树开始搜索,如果没有找到,就从磁盘 上的排序树顺序查找。

在LSM树上进行一次数据更新不需要磁盘访问,在内存即可完成。当数据访问以写操作为主,而读操作则集中在最近写入的数据上时,使用LSM树可以极大程度地减少磁盘的访问次数,加快访问速度。

HBase特性

  • 强读写一致,但是不是“最终一致性”的数据存储,这使得它非常适合高速的计算聚合

  • 自动分片,通过Region分散在集群中,当行数增长的时候,Region也会自动的切分和再分配

  • 自动的故障转移

  • Hadoop/HDFS集成,和HDFS开箱即用,不用太麻烦的衔接

  • 丰富的“简洁,高效”API,Thrift/REST API,Java API

  • 块缓存,布隆过滤器,可以高效的列查询优化

  • 操作管理,Hbase提供了内置的web界面来操作,还可以监控JMX指标

HBase安装

HBase有3种部署模式

  • 单机模式。单机安装部署HBase包即可,使用本地文件目录方式存储数据,内置ZooKeeper启动。
  • 伪分布式模式。单机需要安装部署HBase、ZooKeeper和Hadoop,利用Hadoop的HDFS方式存储数据,外置ZooKeeper启动。
  • 分布式模式。多机部署,生产环境。

这里使用MacOS安装单机模式的HBase,用作测试。

  • 下载HBase安装包,解压

    https://hbase.apache.org/downloads.html

  • 配置HBase环境变量

    .bash_profile文件添加环境变量

    export HBASE_HOME=/Users/qianhao/Applications/HBase/hbase-2.4.17
    export PATH=$PATH:$HBASE_HOME/bin

    当前shell下配置生效

    source ~/.bash_profile

    查看当前HBase版本

    hbase version

  • 配置hbase-env.sh文件

    # 指定java_home目录
    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_333.jdk/Contents/Home
  • 配置hbase-site.xml文件

    <property>
      <name>hbase.cluster.distributed</name>
      <value>false</value>
    </property>
    <property>
      <name>hbase.tmp.dir</name>
      <value>./tmp</value>
    </property>
    <property>
      <name>hbase.unsafe.stream.capability.enforce</name>
      <value>false</value>
    </property>
    <!--数据存储目录-->
    <property>
      <name>hbase.rootdir</name>
      <value>file:///Users/qianhao/Applications/HBase/data</value>
    </property>
  • 启动HBase

    # 启动进程
    qianhao@qianhaodeMacBook-Pro.local:bin $ sh start-hbase.sh
    running master, logging to /Users/qianhao/Applications/HBase/hbase-2.4.17/logs/hbase-qianhao-master-qianhaodeMacBook-Pro.local.out
    # 查看进程 有HMaster表示启动成功
    qianhao@qianhaodeMacBook-Pro.local:bin $ jps
    35993 HMaster
    36075 Jps
    qianhao@qianhaodeMacBook-Pro.local:bin $
  • 停止HBase

    sh stop-hbase.sh

HBase常用命令

$ hbase shell  #启动HBase Shell

#创建表
> create 'student', 'description', 'course'  #创建表名为student的表, 指明两个列名, 分别为description和course

#信息明细
> list 'student'  #列出list表信息

#插入数据
> put 'student', 'row1', 'description:age', '18'  #意思为在student表row1处插入description:age的数据为18
> put 'student', 'row1', 'description:name', 'liu'
> put 'student', 'row1', 'course:chinese', '100'

#读取一行数据
> get 'student', 'row1'


#一次扫描所有数据
> scan 'student

#使表失效 / 有效
> disable 'student'
> enable 'student'

#删除表(要先disable)
>  drop 'student'

#退出shell
> quit

Springboot整合HBase

依赖

<parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.5.14</version>
  </parent>

  <dependencies>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <!-- spring boot test -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
          <exclusions>
              <exclusion>
                  <groupId>org.junit.vintage</groupId>
                  <artifactId>junit-vintage-engine</artifactId>
              </exclusion>
          </exclusions>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.4.17</version>
      </dependency>
      <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.10</version>
      </dependency>
  </dependencies>

HBase封装类

public class HBaseService {
    private Logger log = LoggerFactory.getLogger(HBaseService.class);

    /**
     * 管理员可以做表以及数据的增删改查功能
     */
    private Admin admin = null;
    private Connection connection = null;

    public HBaseService() {
    }

    public HBaseService(Configuration conf) {
        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        } catch (IOException e) {
            log.error("获取HBase连接失败!");
        }
    }

    /**
     * 创建表 create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>}
     *
     * @param tableName
     * @param columnFamily
     * @return 是否创建成功
     */
    public boolean creatTable(String tableName, List<String> columnFamily) {
        try {
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                        Bytes.toBytes(cf)).build());
            });
            //表 table
            TableDescriptor tableDesc = TableDescriptorBuilder
                    .newBuilder(TableName.valueOf(tableName))
                    .setColumnFamilies(cfDesc).build();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                log.debug("table Exists!");
            } else {
                admin.createTable(tableDesc);
                log.debug("create table Success!");
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("创建表{0}失败", tableName), e);
            return false;
        } finally {
            close(admin, null, null);
        }
        return true;
    }

    /**
     * 查询所有表的表名
     *
     * @return 表名集合
     */
    public List<String> getAllTableNames() {
        List<String> result = new ArrayList<>();
        try {
            TableName[] tableNames = admin.listTableNames();
            for (TableName tableName : tableNames) {
                result.add(tableName.getNameAsString());
            }
        } catch (IOException e) {
            log.error("获取所有表的表名失败", e);
        } finally {
            close(admin, null, null);
        }
        return result;
    }

    /**
     * 根据rowKey获取值
     * @param tableName
     * @param rowKey
     * @return
     */
    public Map<String, Map<String, String>> getResult(String tableName, String rowKey){
        if(StringUtils.isBlank(tableName) || StringUtils.isBlank(rowKey)){
            return null;
        }
        Map<String, Map<String, String>> map = new HashMap<>();
        Result result = null;
        Table table = null;
        try{
            table = getTable(tableName);
            Get get = new Get(Bytes.toBytes(rowKey));
//            get.addColumn(Bytes.toBytes("test_colums"), Bytes.toBytes("age"));
            result = table.get(get);
            Map<String, String> columnMap = new HashMap<>();
            for(Cell cell : result.listCells()){
                columnMap.put(
                        //列限定符
                        Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                        //列族
                        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
                );
            }
            map.put(rowKey, columnMap);
        }catch (Exception e){
            log.error(MessageFormat.format("查询指定表中的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e);
        }finally {
            close(null, table);
        }
        return map;
    }

    /**
     * 遍历查询指定表中的所有数据
     *
     * @param tableName
     * @return
     */
    public Map<String, Map<String, String>> getResultScanner(String tableName) {
        Scan scan = new Scan();
        return this.queryData(tableName, scan);
    }

    /**
     * 通过表名及过滤条件查询数据
     *
     * @param tableName
     * @param scan
     * @return
     */
    private Map<String, Map<String, String>> queryData(String tableName, Scan scan) {
        // <rowKey,对应的行数据>
        Map<String, Map<String, String>> result = new HashMap<>();
        ResultScanner rs = null;
        //获取表
        Table table = null;
        try {
            table = getTable(tableName);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                // 每一行数据
                Map<String, String> columnMap = new HashMap<>();
                String rowKey = null;
                // 行键,列族和列限定符一起确定一个单元(Cell)
                for (Cell cell : r.listCells()) {
                    if (rowKey == null) {
                        rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    }
                    columnMap.put(
                            //列限定符
                            Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                            //列族
                            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                if (rowKey != null) {
                    result.put(rowKey, columnMap);
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}", tableName), e);
        } finally {
            close(null, rs, table);
        }
        return result;
    }

    /**
     * 为表添加或者更新数据
     *
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param columns
     * @param values
     */
    public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {
        Table table = null;
        try {
            table = getTable(tableName);
            putData(table, rowKey, tableName, familyName, columns, values);
        } catch (Exception e) {
            log.error(MessageFormat.format("为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}", tableName, rowKey, familyName), e);
        } finally {
            close(null, null, table);
        }
    }

    private void putData(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) {
        try {
            //设置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));
            if (columns != null && values != null && columns.length == values.length) {
                for (int i = 0; i < columns.length; i++) {
                    if (columns[i] != null && values[i] != null) {
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    } else {
                        throw new NullPointerException(MessageFormat.format(
                                "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                    }
                }
            }
            table.put(put);
            log.debug("putData add or update data Success,rowKey:" + rowKey);
            table.close();
        } catch (Exception e) {
            log.error(MessageFormat.format(
                    "为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
                    tableName, rowKey, familyName), e);
        }
    }

    /**
     * 根据表名获取table
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    private Table getTable(String tableName) throws IOException {
        return connection.getTable(TableName.valueOf(tableName));
    }

    /**
     * 关闭流
     *
     * @param admin
     * @param rs
     * @param table
     */
    private void close(Admin admin, ResultScanner rs, Table table) {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                log.error("关闭Admin失败", e);
            }

            if (rs != null) {
                rs.close();
            }

            if (table != null) {
                rs.close();
            }

            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    log.error("关闭Table失败", e);
                }
            }
        }
    }

    /**
     * 关闭流
     *
     * @param admin
     * @param table
     */
    private void close(Admin admin, Table table) {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                log.error("关闭Admin失败", e);
            }

            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    log.error("关闭Table失败", e);
                }
            }
        }
    }
}

配置类

@Configuration
public class HBaseConfig {

    @Bean
    public HBaseService getHBaseService(){
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.cluster.distributed", "false");
        conf.set("hbase.rootdir", "/Users/qianhao/Applications/HBase/data");
        return new HBaseService(conf);
    }
}

Demo

@SpringBootTest
public class HBaseTest {
    @Autowired
    private HBaseService hbaseService;

    //测试创建表
    @Test
    public void testCreateTable() {
        hbaseService.creatTable("test_base", Arrays.asList("a", "back"));
    }

    /**
     * 查询hbase中的所有表
     */
    @Test
    public void test_getAllTable() {
        List<String> allTableNames = hbaseService.getAllTableNames();
        System.out.println(allTableNames);
    }

    //测试加入数据
    @Test
    public void testPutData() {
        hbaseService.putData("test_base", "000001", "a", new String[]{"project_id",
                "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                "0.0000", "12.2293", "null"});
        hbaseService.putData("test_base", "000002", "a", new String[]{"project_id",
                "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
                "0.0000", "9.8679", "null"});
        hbaseService.putData("test_base", "000003", "a", new String[]{"project_id",
                "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "education", "0.8984",
                "0.0000", "25.5649", "null"});
    }

    /**
     * 测试安装rowKey查询数据
     */
    @Test
    public void test_getData() {
        Map<String, Map<String, String>> result = hbaseService.getResult("test_table", "row1");
        System.out.println(result);
    }

    //测试遍历全表
    @Test
    public void testGetResultScanner() {
        Map<String, Map<String, String>> result2 = hbaseService.getResultScanner("test_base");
        System.out.println("-----遍历查询全表内容-----");
        result2.forEach((k, value) -> {
            System.out.println(k + "--->" + value);
        });
    }
}

文章作者: 小小千千
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小小千千 !
评论
  目录