HBase的相关操作-JavaAPI方式
一、需求说明
某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
二、准备工作
1、创建IDEA Maven 项目
2、导入相关pom依赖
<repositories><!-- 代码库 --> | |
<repository> | |
<id>aliyun</id> | |
<url>http://maven.aliyun.com/nexus/content/groups/public/</url> | |
<releases> | |
<enabled>true</enabled> | |
</releases> | |
<snapshots> | |
<enabled>false</enabled> | |
<updatePolicy>never</updatePolicy> | |
</snapshots> | |
</repository> | |
</repositories> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.hbase</groupId> | |
<artifactId>hbase-client</artifactId> | |
<version>2.1.0</version> | |
</dependency> | |
<dependency> | |
<groupId>commons-io</groupId> | |
<artifactId>commons-io</artifactId> | |
<version>2.6</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.12</version> | |
</dependency> | |
<dependency> | |
<groupId>org.testng</groupId> | |
<artifactId>testng</artifactId> | |
<version>6.14.3</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.1</version> | |
<configuration> | |
<target>1.8</target> | |
<source>1.8</source> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> |
3、创建包结构和类
在test目录创建 cn.it.hbase.admin.api_test 包结构
创建TableAmdinTest类
需求一: 使用java代码创建表
创建一个名为WATER_BILL的表,包含一个列蔟C1
//1 如何创建hbase中表 : WATER_BILL | |
public void test01() throws Exception{ | |
//1. 创建 java 连接Hbase的 连接对象 | |
//Configuration conf = new Configuration(); | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息 | |
Connection hbConn = ConnectionFactory.createConnection(conf); | |
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作) | |
Admin admin = hbConn.getAdmin(); | |
//3. 执行相关的操作 | |
//3.1: 首先判断表是否存在 | |
boolean flag = admin.tableExists(TableName.valueOf("WATER_BILL")); // 存在 返回true | |
if(!flag){ // 说明表不存在 | |
//3.2: 通过表构建器构建表信息对象 : 指定表名 | |
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("WATER_BILL")); | |
//3.2: 在表构建表中, 添加列族 | |
ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes()).build(); | |
tableDescriptorBuilder.setColumnFamily(familyDescriptor); | |
//3.3: 构建表的信息对象 | |
TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); | |
//3.4: 执行创建表 | |
admin.createTable(tableDescriptor); | |
} | |
//4. 释放资源 | |
admin.close(); | |
hbConn.close(); | |
} |
需求二: 往表中插入一条数据
// 需求2: 添加一条数据 | |
public void test02() throws Exception{ | |
//1. 创建 java 连接Hbase的 连接对象 | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息 | |
Connection hbConn = ConnectionFactory.createConnection(conf); | |
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作) | |
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL")); | |
//3. 执行相关的操作 : 添加数据命令 put操作 | |
Put put = new Put("4944191".getBytes()); | |
put.addColumn("C1".getBytes(),"name".getBytes(),"登卫红".getBytes()); | |
put.addColumn("C1".getBytes(),"address".getBytes(),"贵州省铜仁市德江县".getBytes()); | |
put.addColumn("C1".getBytes(),"sex".getBytes(),"男".getBytes()); | |
table.put(put); | |
//4. 释放资源 | |
table.close(); | |
hbConn.close(); | |
} |
优化操作: 抽取公共方法
private Connection hBaseConn; private Admin admin; private Table table; private String tableName = "WATER_BILL" ; | |
public void before() throws Exception { | |
//1. 根据连接工厂构建hbase的连接对象 | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181"); | |
hBaseConn = ConnectionFactory.createConnection(conf); | |
//2. 根据连接对象, 获取相关的管理对象: admin table | |
admin = hBaseConn.getAdmin(); | |
table = hBaseConn.getTable(TableName.valueOf(tableName)); | |
} | |
public void after() throws Exception { | |
//4. 释放资源 | |
table.close(); admin.close();hBaseConn.close(); | |
} |
需求三: 查看一条数据
// 需求三: 查询某一条数据: id为 4944191 | |
public void test03() throws Exception{ | |
//1. 创建 java 连接Hbase的 连接对象 | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息 | |
Connection hbConn = ConnectionFactory.createConnection(conf); | |
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作) | |
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL")); | |
//3. 执行相关的操作: 查询某一条数据 get | |
Get get = new Get("4944191".getBytes()); | |
Result result = table.get(get); // 一个 Result 对象 表示 一行数据 | |
//4. 处理结果集(查询) | |
List<Cell> listCells = result.listCells(); | |
for (Cell cell : listCells) { // 从单元格中能获取到哪些内容: rowkey + 列族 + 列名 + 列值 | |
/*byte[] familyArray = cell.getFamilyArray(); | |
byte familyLength = cell.getFamilyLength(); | |
int familyOffset = cell.getFamilyOffset(); | |
String family = new String(familyArray,familyOffset,familyLength);*/ | |
byte[] rowBytes = CellUtil.cloneRow(cell); | |
byte[] familyBytes = CellUtil.cloneFamily(cell); | |
byte[] qualifierBytes = CellUtil.cloneQualifier(cell); | |
byte[] valueBytes = CellUtil.cloneValue(cell); | |
String row = Bytes.toString(rowBytes); | |
String family = Bytes.toString(familyBytes); | |
String qualifier = Bytes.toString(qualifierBytes); | |
String value = Bytes.toString(valueBytes); | |
System.out.println("rowkey:"+row +";列族为:"+family +";列名为:"+qualifier+"; 列值:"+value); | |
} | |
//5. 释放资源: | |
table.close(); | |
hbConn.close(); | |
} |
需求四: 删除一条数据
@Test | |
public void test03() throws Exception { | |
//3. 执行相关操作: | |
Delete delete = new Delete("4944191".getBytes()); | |
delete.addFamily("C2".getBytes()); | |
delete.addColumn("C1".getBytes(),"name".getBytes()); | |
table.delete(delete); | |
} |
需求五: 删除表
@Test | |
public void test04() throws Exception { | |
//3. 执行相关操作 | |
if(admin.isTableEnabled(TableName.valueOf(tableName))){ | |
admin.disableTable(TableName.valueOf(tableName)); | |
} | |
admin.deleteTable(TableName.valueOf(tableName)); | |
} |
需求六: 导入数据
- 在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中
- 说明:
- 在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中
- 用法:
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
- 开始导入:
- 将资料中数据文件上传到Linux中
- 再将文件上传到hdfs中
hadoop fs -mkdir -p /water_bill/output_ept_10W | |
hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W |
执行命令: hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W
注意: 一定要启动yarn集群
hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export
需求七: 查询数据 查询2020年6月份所有用户的用水量 : C1:RECORD_DATE
//需求七: 查询 2020年 6月份所有用户的用户量: C1:RECORD_DATE | |
public void test06() throws Exception{ | |
//1. 创建 java 连接Hbase的 连接对象 | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); // 如果告知hbase: 只需要设置zookeeper的地址即可, 因为zookeeper记录了hbase的各种元数据信息 | |
Connection hbConn = ConnectionFactory.createConnection(conf); | |
//2. 根据连接获取管理对象: admin(执行对表操作) table(执行对表数据的操作) | |
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL")); | |
//3. 执行相关的操作 | |
Scan scan = new Scan(); | |
// LATEST_DATE >= 2020-06-01 | |
// 列值过滤器 | |
SingleColumnValueFilter start_filter = new SingleColumnValueFilter("C1".getBytes(), "LATEST_DATE".getBytes(), | |
CompareOperator.GREATER_OR_EQUAL, new BinaryComparator("2020-06-01".getBytes())); | |
// LATEST_DATE < 2020-07-01 | |
SingleColumnValueFilter end_filter = new SingleColumnValueFilter("C1".getBytes(), "LATEST_DATE".getBytes(), | |
CompareOperator.LESS, new BinaryComparator("2020-07-01".getBytes())); | |
// 将两个条件合并在一起 | |
FilterList filterList = new FilterList(); | |
filterList.addFilter(start_filter); | |
filterList.addFilter(end_filter); | |
// 将条件封装到查询中 | |
scan.setFilter(filterList); | |
scan.setLimit(10); | |
// 执行查询操作 | |
ResultScanner results = table.getScanner(scan); | |
//4. 处理结果集(查询) | |
for (Result result : results) { | |
List<Cell> listCells = result.listCells(); | |
for (Cell cell : listCells) { | |
byte[] rowBytes = CellUtil.cloneRow(cell); | |
byte[] familyBytes = CellUtil.cloneFamily(cell); | |
byte[] qualifierBytes = CellUtil.cloneQualifier(cell); | |
byte[] valueBytes = CellUtil.cloneValue(cell); | |
String row = Bytes.toString(rowBytes); | |
String family = Bytes.toString(familyBytes); | |
String qualifier = Bytes.toString(qualifierBytes); | |
if("NUM_CURRENT".equalsIgnoreCase(qualifier) || "NUM_PREVIOUS".equalsIgnoreCase(qualifier) | |
|| "NUM_USAGE".equalsIgnoreCase(qualifier) || "TOTAL_MONEY".equalsIgnoreCase(qualifier)){ | |
Double value = Bytes.toDouble(valueBytes); | |
System.out.println("rowkey:"+row +";列族为:"+family +";列名为:"+qualifier+"; 列值:"+value); | |
}else { | |
String value = Bytes.toString(valueBytes); | |
System.out.println("rowkey:"+row +";列族为:"+family +";列名为:"+qualifier+"; 列值:"+value); | |
} | |
} | |
System.out.println("------------------------"); | |
} | |
//5. 释放资源: @After | |
} |
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨