写作目的:当做自己的HBase笔记本
HBase表结构
Shell命名
进入hbase客户端命名行
./bin/hbase shell
查看当前库中有哪些表
list
创建表
create 表名称,列族名
create "student20190727","info"
插入数据
put 表命 ,rowkey,columnFamly,键,值
put "student20190727","1001","info:name","Tom"
put "student20190727","1001","info:age","18"
put "student20190727","1001","info:sex","male"
查看指定列
get "student20190727","1001"
get "student20190727","1001","info:name"
删除数据
delete "student20190727","1001","info:sex"
deleteall "student20190727","1001"
清空表数据
disable "student20190727"
truncate "student20190727"
删除表
disable "student20190727"
drop "student20190727"
API中常用的类介绍
HBaseAdmin(Admin):管理表(创建,删除) HTableDescriptor:表描述器,用于创建表 HColumnDescriptor:列描述器(构建列族)
Table:用于表中数据的操作 Put:用于封装待存放的数据 Delete:用于封装待删除的数据 Get:用于得到某一个具体的数据
Scan:用于扫描表的配置信息 ResultScanner:通过配置的扫描器,得到一个扫描表的实例扫描器 Result:每一个该类型的实例化对象都对应了一个rowkey中的若干数据。 Cell:用于封装一个rowkey下面所有单元格中放入数据(rowKey,comulnFamily,column,value)
Java操作HBase
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
ZookeeperDemo
package com.imooc.demo;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
/**
* Zookeeper 测试是否可以正常和zookeeper建立连接
*/
public class ZookeeperDemo {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.16:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 2000;// ms
/** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 获取事件的状态
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
// 如果是建立连接
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
// 如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
System.out.println("zk 建立连接");
connectedSemaphore.countDown();
}
}
}
});
// 进行阻塞
connectedSemaphore.await();
System.out.println("..");
// 创建父节点
// zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE,
// CreateMode.PERSISTENT);
// 创建子节点
// zk.create("/testRoot/children", "children data".getBytes(),
// Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 获取节点洗信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
// 修改节点的值
// zk.setData("/testRoot", "modify data root".getBytes(), -1);
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// 判断节点是否存在
// System.out.println(zk.exists("/testRoot/children", false));
// 删除节点
// zk.delete("/testRoot/children", -1);
// System.out.println(zk.exists("/testRoot/children", false));
zk.close();
}
}
HBaseDemo
package com.imooc.demo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseDemo {
public static Configuration conf;
static {
// 使用 HBaseConfiguration 的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
public static void main(String[] args) throws Exception {
// createTable("student","info");
// System.out.println(isTableExist("student"));
// dropTable("student2019-5-9-9-15");
// addRowData("student", "1001", "info", "name", "zhangsan");
// addRowData("student", "1002", "info", "name", "lisi");
// deleteOneRow("student", "1001");
// deleteMultiRow("student", "1001","1002");
// getAllRows("student");
//getRowQualifier("student", "1001", "info", "name");
}
/**
* 获取某一行指定“列族:列”的数据
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @throws IOException
*/
public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier)
throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = hTable.get(get);
for (Cell cell : result.rawCells()) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
/**
* 获取一个表的所有数据
*
* @param tableName
* @throws IOException
*/
public static void getAllRows(String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
// 得到用于扫描 region 的对象
Scan scan = new Scan();
// 使用 HTable 得到 resultcanner 实现类的对象
ResultScanner resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// 得到 rowkey
System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
// 得到列族
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
/**
* 删除一行数据
*
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void deleteOneRow(String tableName, String rowKey) throws IOException {
// 创建 HTable 对象
Connection connection = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
hTable.delete(delete);
hTable.close();
}
/**
* 删除多行数据
*
* @param tableName
* @param rows
* @throws IOException
*/
public static void deleteMultiRow(String tableName, String... rows) throws IOException {
// 创建 HTable 对象
Connection connection = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
List<Delete> deleteList = new ArrayList<Delete>();
for (String row : rows) {
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
/**
* 添加一行数据
*
* @param tableName
* 表名
* @param rowKey
* @param columnFamily
* 列族
* @param column
* @param value
* @throws Exception
*/
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value)
throws Exception {
// 创建 HTable 对象
Connection connection = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
// 向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
// 向 Put 对象中组装数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}
/**
* 删除表
*
* @param tableName
* @throws Exception
*/
public static void dropTable(String tableName) throws Exception {
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
if (isTableExist(tableName)) {
// disableTable,否则删除不成功
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
} else {
System.out.println("表" + tableName + "不存在!");
}
}
/**
* 创建表
*
* @param tableName:表名字
* @param columnFamily:列族
* @throws Exception
*/
public static void createTable(String tableName, String... columnFamily) throws Exception {
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 判断表是否存在
if (isTableExist(tableName)) {
System.out.println("表" + tableName + "已存在");
// System.exit(0);
} else {
// 创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
// 创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
// 根据对表的配置,创建表
admin.createTable(descriptor);
System.out.println("表" + tableName + "创建成功!");
}
}
/**
* 判断表是否存在
*
* @param tableName
* @return
* @throws Exception
*/
public static boolean isTableExist(String tableName) throws Exception {
// 在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 有问题
boolean tableExists = admin.tableExists(tableName);
return tableExists;
}
}
实现将 HDFS 中的数据写入到 HBase 表中
mapper
package com.imooc.hdfstohbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HDFStoHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 从 HDFS 中读取的数据
String lineValue = value.toString();
// 读取出来的每行数据使用\t 进行分割,存于 String 数组
String[] values = lineValue.split("\t");
// 根据数据中值的含义取值
String rowKey = values[0];
String name = values[1];
String color = values[2];
// 初始化 rowKey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
// 初始化 put 对象
Put put = new Put(Bytes.toBytes(rowKey));
// 参数分别:列族、列、值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put);
}
}
reducer
package com.imooc.hdfstohbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class HDFStoHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
// 读出来的每一行数据写入到 fruit_hdfs 表中
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
Driver
package com.imooc.hdfstohbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HDFStoHBaseDriver implements Tool {
private Configuration conf = null;
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new HDFStoHBaseDriver(), args);
System.exit(status);
}
public int run(String[] args) throws Exception {
// 得到 Configuration
Configuration conf = this.getConf();
// 创建 Job 任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(HDFStoHBaseDriver.class);
Path inPath = new Path("hdfs://192.168.1:8020/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);
// 设置 Mapper
job.setMapperClass(HDFStoHBaseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置 Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", HDFStoHBaseReducer.class, job);
// 设置 Reduce 数量,最少 1 个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = HBaseConfiguration.create(conf);
}
}
遇到的坑
问题1:
2019-05-08 21:53:21,253 INFO [org.apache.zookeeper.ZooKeeper] - Initiating client connection, connectString=47.105.132.96:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@5a4041cc
2019-05-08 21:53:22,077 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error)
2019-05-08 21:53:43,081 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
2019-05-08 21:53:44,311 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error)
2019-05-08 21:54:05,314 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
解决办法:
1)zookeeper服务没开
2)2181 端口没开
3)将 C:\Windows\System32\drivers\etc 下的 hosts 中 添加 linux映射
问题2:
2019-05-08 22:44:45,358 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established to 47.105.132.96/47.105.132.96:2181, initiating session
2019-05-08 22:44:45,418 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server 47.105.132.96/47.105.132.96:2181, sessionid = 0x16a97cb36b40008, negotiated timeout = 40000
Exception in thread "main" java.io.IOException: Failed to get result within timeout, timeout=60000ms
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:206)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:212)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:164)
at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:159)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:796)
at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602)
at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:408)
at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:418)
at com.imooc.demo.HBaseDemo.isTableExist(HBaseDemo.java:62)
at com.imooc.demo.HBaseDemo.main(HBaseDemo.java:23)
解决办法:
如果各种配置都没有错的话,那就是hbase下面的4个端口没开放
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>
<property>
<name>hbase.regionserver.port</name>
<value>16201</value>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>16301</value>
</property>