基础API中的查询操作在面对大量数据的时候是非常苍白的,这里HBase提供了高级的查询方法: Filter可以根据 要完成一个过滤的操作,至少需要两个参数,一个是抽象的 过滤器的类型很多,但是可以分为两大类—— 1)行键过滤器 RowFilter 2)列簇过滤器 FamilyFilter 3)列过滤器 QualifierFilter 4)值过滤器 ValueFilter 5)时间戳过滤器 TimestampsFilter 1)单列值过滤器 SingleColumnValueFilter —-会返回满足条件的整行 2)单列值排除器 SingleColumnValueExcludeFilter 3)前缀过滤器 PrefixFilter—-针对行键 4)列前缀过滤器 ColumnPrefixFilter 5)过滤器综合查询 FilterList 现有HDFS中有一个student.txt文件,格式如下 需求:将HDFS上的这个文件里面的数据写入到HBase中 1)Hive (2) 用于数据分析、清洗 (3) 基于HDFS、MapReduce 2)HBase (2) 用于存储结构化和非结构化的数据 (3) 基于HDFS (4) 延迟较低,接入在线业务使用 1)软连接HBase的jar包到Hive下 2)vim hive-site.xml 需求:建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表 (1)在Hive中创建表同时关联HBase 提示:建表完成之后,可以分别进入Hive和HBase查看,都生成了对应的表 (2)在Hive中创建临时中间表,用于load文件中的数据 (3) 向Hive中间表中load数据 (4)通过insert命令将中间表中的数据导入到Hive关联HBase的那张表中 (5) 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据 需求:在HBase中已经存储了某一张表 (1) 在Hive中创建外部表关联HBase表 (2)关联后就可以使用Hive函数进行一些分析操作了(图片来源于网络,侵删)
一、过滤器
【1】概述
Filter
列簇
、列
、版本
等更多的条件来对数据进行过滤,基于HBase本身提供的三维有序
(主键有序
、列有序
、版本有序
),这些Filter可以高效
的完成查询过滤的任务。带有Filter条件的RPC查询请求
会把Filter分发到各个RegionServer
,是一个服务器端(Server-side)的过滤器,这样也可以降低网络传输的压力操作符
,HBase提供了枚举类型的变量来表示这些抽象的操作符,如下👇抽象操作符(比较运算符) LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有
比较逻辑
,如果可以提高字节级的比较、字符串级的比较等。有了这两个参数,我们就可以清晰的定义筛选的条件,过滤数据HBase过滤器的`专用过滤器`(指定比较机制): BinaryComparator : 按字节索引顺序比较指定字节数组,采用`Bytes.compareTo(byte[])` BinaryPrefixComparator :跟`BinaryComparator`相同,只是比较左端的数据是否相同 NullComparator :判断给定的是否为空 BitComparator :按位比较 RegexStringComparator :提供一个正则的比较器,仅支持 `Equal` 和 `非Equal` SubstringComparator :判断`字符串`是否出现在`value`中
【2】API实战
比较过滤器
、专用过滤器
1、比较过滤器
public class RowFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter rowFilter = new RowFilter(CompareFilter.CompareOp.GREATER,new BinaryComparator("1003".getBytes())); scan.setFilter(rowFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class FamilyFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("info".getBytes())); scan.setFilter(familyFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class QualifierFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("name".getBytes())); scan.setFilter(qualifierFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class ValueFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("wang")); scan.setFilter(valueFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class TimestampsFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); List<Long> list = new ArrayList<>(); list.add(1587266217618L); Filter timestampsFilter = new TimestampsFilter(list); scan.setFilter(timestampsFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
2、专用过滤器
public class SingleColumnValueFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter( "info".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, new SubstringComparator("zhangsan")); singleColumnValueFilter.setFilterIfMissing(true); scan.setFilter(singleColumnValueFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class SingleColumnValueExcludeFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter( "info".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, new SubstringComparator("z")); singleColumnValueExcludeFilter.setFilterIfMissing(true); scan.setFilter(singleColumnValueExcludeFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class PrefixFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter prefixFilter = new PrefixFilter("1001".getBytes()); scan.setFilter(prefixFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class ColumnPrefixFilterDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("info".getBytes())); scan.setFilter(familyFilter); ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter("name".getBytes()); scan.setFilter(columnPrefixFilter); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
public class FilterListDemo { private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "node01:2181,node01:2181,node01:2181"; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("student")); Scan scan = new Scan(); Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("info".getBytes())); scan.setFilter(familyFilter); FilterList filterList = new FilterList(); SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("wangwu")); PrefixFilter prefixFilter = new PrefixFilter( Bytes.toBytes("100")); filterList.addFilter(singleColumnValueFilter); filterList.addFilter(prefixFilter); scan.setFilter(filterList); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { for (Cell cell : result.rawCells()) { System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "t" + Bytes.toString(CellUtil.cloneFamily(cell)) + "t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "t" + Bytes.toString(CellUtil.cloneValue(cell)) + "t" + cell.getTimestamp()); } } table.close(); connection.close(); } }
二、HBase集成MapReduce
95002,刘晨,女,19,IS 95017,王风娟,女,18,IS 95018,王一,女,19,IS 95013,冯伟,男,21,CS 95014,王小丽,女,19,CS 95019,邢小丽,女,19,IS 95020,赵钱,男,21,IS 95003,王敏,女,22,MA 95004,张立,男,19,IS 95012,孙花,女,20,CS 95010,孔小涛,男,19,CS 95005,刘刚,男,18,MA 95006,孙庆,男,23,CS 95007,易思玲,女,19,MA 95008,李娜,女,18,CS 95021,周二,男,17,MA 95022,郑明,男,20,MA 95001,李勇,男,20,CS 95011,包小柏,男,18,MA 95009,梦圆圆,女,18,MA 95015,王君,男,18,MA
将core-site.xml
、hbase-site.xml
、hdfs-site.xml
放到resources目录下public class ReadHDFSDataToHbaseMR { public static void main(String[] args) throws Exception { // conf Configuration conf = HBaseConfiguration.create(); // fs FileSystem fs = FileSystem.get(conf); // Jobg Job job = Job.getInstance(conf); job.setJarByClass(ReadHDFSDataToHbaseMR.class); // mapper / outputKey / outputValue job.setMapperClass(HDFSToHbaseMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job); // outputKey / outputValue Path inputPath = new Path("/datas/input/student.txt"); // add path FileInputFormat.addInputPath(job, inputPath); // submit job.waitForCompletion(true); } public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private byte[] rowKey; private byte[] name; private byte[] sex; private byte[] age; private byte[] department; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] datas = line.split(","); rowKey = Bytes.toBytes(datas[0]); name = Bytes.toBytes(datas[1]); sex = Bytes.toBytes(datas[2]); age = Bytes.toBytes(datas[3]); department = Bytes.toBytes(datas[4]); Put put = new Put(rowKey); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), name); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), sex); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), age); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("department"), department); ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowKey); context.write(rowKeyWritable, put); } } public static class HDFSToHbaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(NullWritable.get(), put); } } } }
总结:Driver、Mapper、Reducer分别需要使用的类或方法 Driver: TableMapReduceUtil.initTableMapperJob TableMapReduceUtil.initTableReducerJob Mapper:TableMapper<KEYOUT, VALUEOUT> Reducer:TableReducer<KEYIN, VALUEIN, KEYOUT>
三、HBase集成Hive
【1】HBase与Hive的对比
(1) 数据仓库
Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询
Hive适用于离线的数据分析和清洗,延迟较高
Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行
(1) 数据库
是一种面向列存储的非关系型数据库
适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作
数据持久化存储的体现形式是Hfile,存放于DataNode中,被ResionServer以region的形式进行管理
面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。【2】 HBase与Hive集成使用
环境准备
因为我们后续可能会在操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar包,复制Hive所依赖的Jar包(或者使用软连接的形式)ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib
添加Zookeeper地址<property> <name>hive.zookeeper.quorum</name> <value>node01:2181,node02:2181,node03:2181</value> </property>
API案例1
create table hive_hbase_emp_table( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
提示:不能将数据直接load进Hive所关联HBase的那张表中create table emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) row format delimited fields terminated by 't';
load data local inpath '/opt/modules/hive/datas/emp.txt' into table emp;
insert into table hive_hbase_emp_table select * from emp;
Hive: select * from hive_hbase_emp_table; HBase: scan ‘hbase_emp_table’
API案例2
hbase_emp_table
,然后在Hive中创建一个外部表
来关联HBase中的hbase_emp_table
这张表,使之可以借助Hive
来分析HBase这张表中的数据
注意:该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1create external table relevance_hbase_emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
select * from relevance_hbase_emp;
都看到这里了,评论一下吧!!!
点击查看👇
【HBase】HBase入门详解(四)
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算