HBase笔记

Java
370
0
0
2024-02-26
标签   HBase

写作目的:当做自己的HBase笔记本

HBase表结构

Shell命名

进入hbase客户端命名行

./bin/hbase shell

查看当前库中有哪些表

list

创建表

create 表名称,列族名

create "student20190727","info"

插入数据

put 表命 ,rowkey,columnFamly,键,值

put "student20190727","1001","info:name","Tom"
put "student20190727","1001","info:age","18"
put "student20190727","1001","info:sex","male"

 查看指定列

get "student20190727","1001"
get "student20190727","1001","info:name"

 删除数据

delete "student20190727","1001","info:sex"
deleteall "student20190727","1001"

 清空表数据

disable "student20190727"
truncate "student20190727"

删除表

disable "student20190727"
drop "student20190727"

API中常用的类介绍

HBaseAdmin(Admin):管理表(创建,删除)  HTableDescriptor:表描述器,用于创建表  HColumnDescriptor:列描述器(构建列族)

Table:用于表中数据的操作  Put:用于封装待存放的数据  Delete:用于封装待删除的数据  Get:用于得到某一个具体的数据

Scan:用于扫描表的配置信息 ResultScanner:通过配置的扫描器,得到一个扫描表的实例扫描器 Result:每一个该类型的实例化对象都对应了一个rowkey中的若干数据。 Cell:用于封装一个rowkey下面所有单元格中放入数据(rowKey,comulnFamily,column,value)

Java操作HBase

pom.xml
<dependencies>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-server</artifactId>
			<version>1.3.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.3.1</version>
		</dependency>

		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.6</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
	</dependencies>
ZookeeperDemo 
package com.imooc.demo;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;

/**
 * Zookeeper 测试是否可以正常和zookeeper建立连接
 */
public class ZookeeperDemo {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "192.168.1.16:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 2000;// ms
	/** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
	static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				// 获取事件的状态
				KeeperState keeperState = event.getState();
				EventType eventType = event.getType();
				// 如果是建立连接
				if (KeeperState.SyncConnected == keeperState) {
					if (EventType.None == eventType) {
						// 如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
						System.out.println("zk 建立连接");
						connectedSemaphore.countDown();
					}
				}
			}
		});

		// 进行阻塞
		connectedSemaphore.await();

		System.out.println("..");
		// 创建父节点
		// zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE,
		// CreateMode.PERSISTENT);

		// 创建子节点
		// zk.create("/testRoot/children", "children data".getBytes(),
		// Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

		// 获取节点洗信息
		// byte[] data = zk.getData("/testRoot", false, null);
		// System.out.println(new String(data));
		// System.out.println(zk.getChildren("/testRoot", false));

		// 修改节点的值
		// zk.setData("/testRoot", "modify data root".getBytes(), -1);
		// byte[] data = zk.getData("/testRoot", false, null);
		// System.out.println(new String(data));

		// 判断节点是否存在
		// System.out.println(zk.exists("/testRoot/children", false));
		// 删除节点
		// zk.delete("/testRoot/children", -1);
		// System.out.println(zk.exists("/testRoot/children", false));

		zk.close();

	}

}
HBaseDemo
package com.imooc.demo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseDemo {
	public static Configuration conf;
	static {
		// 使用 HBaseConfiguration 的单例方法实例化
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "192.168.1");
		conf.set("hbase.zookeeper.property.clientPort", "2181");

	}

	public static void main(String[] args) throws Exception {
		
		// createTable("student","info");
		// System.out.println(isTableExist("student"));
		// dropTable("student2019-5-9-9-15");
		// addRowData("student", "1001", "info", "name", "zhangsan");
		// addRowData("student", "1002", "info", "name", "lisi");
		// deleteOneRow("student", "1001");
		// deleteMultiRow("student", "1001","1002");
		// getAllRows("student");
		//getRowQualifier("student", "1001", "info", "name");

	}

	/**
	 * 获取某一行指定“列族:列”的数据
	 * @param tableName
	 * @param rowKey
	 * @param family
	 * @param qualifier
	 * @throws IOException
	 */
	public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier)
			throws IOException {
		Connection connection = ConnectionFactory.createConnection(conf);
		HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
		Result result = hTable.get(get);
		for (Cell cell : result.rawCells()) {
			System.out.println("行键:" + Bytes.toString(result.getRow()));
			System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
			System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
			System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
		}
	}

	/**
	 * 获取一个表的所有数据
	 * 
	 * @param tableName
	 * @throws IOException
	 */
	public static void getAllRows(String tableName) throws IOException {
		Connection connection = ConnectionFactory.createConnection(conf);
		HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
		// 得到用于扫描 region 的对象
		Scan scan = new Scan();
		// 使用 HTable 得到 resultcanner 实现类的对象
		ResultScanner resultScanner = hTable.getScanner(scan);
		for (Result result : resultScanner) {
			Cell[] cells = result.rawCells();
			for (Cell cell : cells) {
				// 得到 rowkey
				System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));

				// 得到列族
				System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
				System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
				System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
			}
		}
	}

	/**
	 * 删除一行数据
	 * 
	 * @param tableName
	 * @param rowKey
	 * @throws IOException
	 */
	public static void deleteOneRow(String tableName, String rowKey) throws IOException {
		// 创建 HTable 对象
		Connection connection = ConnectionFactory.createConnection(conf);
		HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
		Delete delete = new Delete(Bytes.toBytes(rowKey));
		hTable.delete(delete);
		hTable.close();
	}

	/**
	 * 删除多行数据
	 * 
	 * @param tableName
	 * @param rows
	 * @throws IOException
	 */
	public static void deleteMultiRow(String tableName, String... rows) throws IOException {
		// 创建 HTable 对象
		Connection connection = ConnectionFactory.createConnection(conf);
		HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
		List<Delete> deleteList = new ArrayList<Delete>();
		for (String row : rows) {
			Delete delete = new Delete(Bytes.toBytes(row));
			deleteList.add(delete);
		}
		hTable.delete(deleteList);
		hTable.close();
	}

	/**
	 * 添加一行数据
	 * 
	 * @param tableName
	 *            表名
	 * @param rowKey
	 * @param columnFamily
	 *            列族
	 * @param column
	 * @param value
	 * @throws Exception
	 */
	public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value)
			throws Exception {
		// 创建 HTable 对象
		Connection connection = ConnectionFactory.createConnection(conf);
		HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
		// 向表中插入数据
		Put put = new Put(Bytes.toBytes(rowKey));
		// 向 Put 对象中组装数据
		put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
		hTable.put(put);
		hTable.close();
		System.out.println("插入数据成功");
	}

	/**
	 * 删除表
	 * 
	 * @param tableName
	 * @throws Exception
	 */
	public static void dropTable(String tableName) throws Exception {

		Connection connection = ConnectionFactory.createConnection(conf);
		HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
		if (isTableExist(tableName)) {
			// disableTable,否则删除不成功
			admin.disableTable(tableName);
			admin.deleteTable(tableName);
			System.out.println("表" + tableName + "删除成功!");
		} else {
			System.out.println("表" + tableName + "不存在!");
		}
	}

	/**
	 * 创建表
	 * 
	 * @param tableName:表名字
	 * @param columnFamily:列族
	 * @throws Exception
	 */
	public static void createTable(String tableName, String... columnFamily) throws Exception {
		Connection connection = ConnectionFactory.createConnection(conf);
		HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();

		// 判断表是否存在
		if (isTableExist(tableName)) {
			System.out.println("表" + tableName + "已存在");
			// System.exit(0);
		} else {
			// 创建表属性对象,表名需要转字节
			HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
			// 创建多个列族
			for (String cf : columnFamily) {
				descriptor.addFamily(new HColumnDescriptor(cf));
			}

			// 根据对表的配置,创建表
			admin.createTable(descriptor);
			System.out.println("表" + tableName + "创建成功!");
		}
	}

	/**
	 * 判断表是否存在
	 * 
	 * @param tableName
	 * @return
	 * @throws Exception
	 */
	public static boolean isTableExist(String tableName) throws Exception {
		// 在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
		Connection connection = ConnectionFactory.createConnection(conf);
		HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
		// 有问题
		boolean tableExists = admin.tableExists(tableName);
		return tableExists;
	}

}

实现将 HDFS 中的数据写入到 HBase 表中

mapper
package com.imooc.hdfstohbase;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HDFStoHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 从 HDFS 中读取的数据
		String lineValue = value.toString();
		// 读取出来的每行数据使用\t 进行分割,存于 String 数组
		String[] values = lineValue.split("\t");
		// 根据数据中值的含义取值
		String rowKey = values[0];
		String name = values[1];
		String color = values[2];
		// 初始化 rowKey
		ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
		// 初始化 put 对象
		Put put = new Put(Bytes.toBytes(rowKey));
		// 参数分别:列族、列、值
		put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
		put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));

		context.write(rowKeyWritable, put);
	}
}
reducer
package com.imooc.hdfstohbase;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class HDFStoHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
	@Override
	protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
			throws IOException, InterruptedException {
		// 读出来的每一行数据写入到 fruit_hdfs 表中
		for (Put put : values) {
			context.write(NullWritable.get(), put);
		}
	}
}
Driver
package com.imooc.hdfstohbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HDFStoHBaseDriver implements Tool {

	private Configuration conf = null;

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		int status = ToolRunner.run(conf, new HDFStoHBaseDriver(), args);
		System.exit(status);
	}

	public int run(String[] args) throws Exception {
		// 得到 Configuration
		Configuration conf = this.getConf();
		// 创建 Job 任务
		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(HDFStoHBaseDriver.class);
		Path inPath = new Path("hdfs://192.168.1:8020/input_fruit/fruit.tsv");
		FileInputFormat.addInputPath(job, inPath);
		// 设置 Mapper
		job.setMapperClass(HDFStoHBaseMapper.class);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		// 设置 Reducer
		TableMapReduceUtil.initTableReducerJob("fruit_mr", HDFStoHBaseReducer.class, job);
		// 设置 Reduce 数量,最少 1 个
		job.setNumReduceTasks(1);
		boolean isSuccess = job.waitForCompletion(true);
		if (!isSuccess) {
			throw new IOException("Job running with error");
		}
		return isSuccess ? 0 : 1;
	}

	@Override
	public Configuration getConf() {
		return this.conf;
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = HBaseConfiguration.create(conf);

	}
}

 遇到的坑

问题1:
2019-05-08 21:53:21,253 INFO [org.apache.zookeeper.ZooKeeper] - Initiating client connection, connectString=47.105.132.96:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@5a4041cc
2019-05-08 21:53:22,077 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error)
2019-05-08 21:53:43,081 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
2019-05-08 21:53:44,311 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error)
2019-05-08 21:54:05,314 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

解决办法:

1)zookeeper服务没开

2)2181 端口没开

3)将  C:\Windows\System32\drivers\etc 下的 hosts 中 添加 linux映射

问题2:
2019-05-08 22:44:45,358 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established to 47.105.132.96/47.105.132.96:2181, initiating session
2019-05-08 22:44:45,418 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server 47.105.132.96/47.105.132.96:2181, sessionid = 0x16a97cb36b40008, negotiated timeout = 40000
Exception in thread "main" java.io.IOException: Failed to get result within timeout, timeout=60000ms
	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:206)
	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60)
	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:212)
	at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
	at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
	at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:164)
	at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:159)
	at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:796)
	at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602)
	at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366)
	at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:408)
	at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:418)
	at com.imooc.demo.HBaseDemo.isTableExist(HBaseDemo.java:62)
	at com.imooc.demo.HBaseDemo.main(HBaseDemo.java:23)

解决办法:

如果各种配置都没有错的话,那就是hbase下面的4个端口没开放

<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>

<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>

<property>
<name>hbase.regionserver.port</name>
<value>16201</value>
</property>

<property>
<name>hbase.regionserver.info.port</name>
<value>16301</value>
</property>