Hadoop_MapReduce工作原理 六个阶段: mapper的输入数据为KV对形式,每一个KV对都会调用map()方法,输出数据也是KV对形式。 mapper从context中获得输入数据,将处理后的结果写入context中(context.write(text, iw);),输入(LongWritable, Text)和输出(Text, IntWritable)的数据格式由用户设置。 context通过RecordReader获取输入数据,通过RecordWriter保存mapper处理后的数据
InputFormat负责处理MR的输入 InputFormat是一个抽象类,有以下几个子类: InputFormat有三个方法: InputFormat的子类FileInputFormat还是一个抽象类,有以下几个子类: TextInputFormat TextInputFormat 是MapReduce默认的InputFormat,它是按行读取每条记录。 NLineInputFormat 切片方式:以文件N行作为一个切片,默认一行一个切片。 示例:输入12行数据,以3行为一个切片,分成4个切片: 修改 Hadoop_WordCount单词统计 工程 KeyValueTextInputFormat 示例,使用 KeyValueTextInputFormat 统计以下txt中人名出现的次数 D:datamoney.txt ( 注意该文件中每一行的人名与后面的数据的分割符为Tab ) CombineTextInputFormat TextInputFormat 的切片机制是按文件切片,如果有大量的小文件,就会产生大量的MapTask,处理效率会很低。而CombineTextInputFormat可以将小文件合并为一个切片进行处理。 假设有以下四个文件: 假设 setMaxInputSplitSize的值为4M CombineTextInputFormat实例演示: 关键代码: 自定义InputFormat 实例:过滤指定的单词,不进行统计
Key(LongWritable):用来存储该行在整个文件中的起始字节偏移量
Value(Text):为该行的内容。
TextInputFormat对文件切分的逻辑是使用父类(FileInputFormat)的 getSplits() 方法。
切片方式为:对每个文件进行切分,默认的切片大小为128M.
KEY类型:LongWritable
VALUE类型:Text
package com.blu.mywordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyWordCount { public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyWordCountMapper.class); job.setReducerClass(MyWordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定划分切片的行数 NLineInputFormat.setNumLinesPerSplit(job, 3); //指定InputFormat的类型 job.setInputFormatClass(NLineInputFormat.class); boolean flag = job.waitForCompletion(true); System.exit(flag ?0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
good morning good afternoon good evening zhangsan male lisi female wangwu male good morning good afternoon good evening zhangsan male lisi female wangwu male
D:datatestdata.txt D:dataoutput
afternoon 2 evening 2 female 2 good 6 lisi 2 male 4 morning 2 wangwu 2 zhangsan 2
[INFO ] 2020-04-26 17:12:41,643 method:org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:204) number of splits:4
//指定划分切片的行数 NLineInputFormat.setNumLinesPerSplit(job, 3); //指定InputFormat的类型 job.setInputFormatClass(NLineInputFormat.class);
KEY类型:Text :以分隔符前的数据作为key
VALUE类型:Text :以分隔符后的数据作为valuezhangsan 500 450 jan lisi 200 150 jan lilei 150 160 jan zhangsan 500 500 feb lisi 200 150 feb lilei 150 160 feb
package com.blu.kvdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 输出格式: * zhangsan 1 * lisi 1 * zhangsan 1 * * @author BLU * */ public class Kvmapper extends Mapper<Text, Text, Text, IntWritable>{ /** * 输入格式: * zhangsan 500 450 jan * key:zhangsan * value:500 450 jan */ private IntWritable iw = new IntWritable(1); @Override protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { context.write(key, iw); } }
package com.blu.kvdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class KvReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable iw = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable iw : value) { sum += iw.get(); } iw.set(sum); context.write(key, iw); } }
package com.blu.kvdemo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class KeyValueDemo { public static void main(String[] args) throws Exception { Job job = Job.getInstance(); job.setInputFormatClass(KeyValueTextInputFormat.class); Configuration conf = new Configuration(); //设置以tab为分隔符 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "t"); job.setJarByClass(KeyValueDemo.class); job.setMapperClass(com.blu.kvdemo.Kvmapper.class); job.setReducerClass(com.blu.kvdemo.KvReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); } }
D:datamoney.txt D:dataoutput
lilei 2 lisi 2 zhangsan 2
CombineTextInputFormat的切片机制:
a.txt 1.7M b.txt 5.1M c.txt 3.4M d.txt 6.8M
将所有文件依次与 setMaxInputSplitSize的值4M比较,如果小于4M,逻辑上划分为一块。如果大于4M但小于8M,则文件均分为两块。如果大于8M,则先以4M为一块,剩余大小继续比较。
分块情况如下:块1: 1.7M 块2: 2.55M 块3: 2.55M 块4: 3.4M 块5: 3.4M 块6: 3.4M
判断虚拟存储文件块大小是否大于等于setMaxInputSplitSize的值(4M),如果大于等于4M,则单独作为一个切片。如果小于4M,则与下一个文件块合并为一个切片。
最终形成3个切片:切片1: 1.7M+2.55M 切片2: 2.55M+3.4M 切片3: 3.4M+3.4M
package com.blu.mywordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyWordCount { public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyWordCountMapper.class); job.setReducerClass(MyWordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.exit(flag ?0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
D:data D:dataoutput
Total input files to process : 6 number of splits:6
package com.blu.mywordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyWordCount { public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyWordCountMapper.class); job.setReducerClass(MyWordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入的格式类为CombineTextInputFormat job.setInputFormatClass(CombineTextInputFormat.class); //设置虚拟切片最大值为1M CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.exit(flag ?0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
//设置输入的格式类为CombineTextInputFormat job.setInputFormatClass(CombineTextInputFormat.class); //设置虚拟切片最大值为1M CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024);
Total input files to process : 6 number of splits:1
步骤:
自定义一个类继承FileInputFormat
重写RecordReader
package com.blu.mywordcount.inputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class MyInputFormat extends FileInputFormat<LongWritable, Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new myRecordReader(context.getConfiguration()); } }
package com.blu.mywordcount.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class myRecordReader extends RecordReader<LongWritable, Text> { public static String CUSTOM_KEYWORD="mapreduce.input.myRecordReader.line.keyword"; private LineRecordReader lineRecordReader; //要过滤的单词 private String keyword; private LongWritable key; private Text value; public myRecordReader() { super(); } public myRecordReader(Configuration conf) { lineRecordReader = new LineRecordReader(); keyword = conf.get(CUSTOM_KEYWORD); } /** * 初始化方法 */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader.initialize(split, context); } /** * 主要逻辑 * 返回值true表示继续获取后面的数据 * 返回值false表示停止获取后面的数据 */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { //判断是否还有数据,没有数据就停止继续读取 if(!lineRecordReader.nextKeyValue()) { return false; } //获得一行数据 Text currentValue = lineRecordReader.getCurrentValue(); //判断这一行数据中是否包含要过滤的单词 String val = currentValue.toString(); if(keyword != null) { if(val.contains(keyword)) { val = val.replace(keyword+" ", ""); currentValue.set(val); } } key = lineRecordReader.getCurrentKey(); value = currentValue; return true; } /** * 返回当前行的Key的值 */ @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } /** * 返回当前行的Value的值 */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
package com.blu.mywordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.blu.mywordcount.inputformat.MyInputFormat; import com.blu.mywordcount.inputformat.myRecordReader; public class MyWordCount { public static void main(String[] args) { try { Configuration conf = new Configuration(); //设置要过滤的单词 conf.set(myRecordReader.CUSTOM_KEYWORD, "zhangsan"); Job job = Job.getInstance(conf); job.setJarByClass(MyWordCount.class); job.setMapperClass(MyWordCountMapper.class); job.setReducerClass(MyWordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置自定义的输入类 job.setInputFormatClass(MyInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean flag = job.waitForCompletion(true); System.exit(flag ?0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
good morning good afternoon good evening zhangsan male lisi female wangwu male good morning good afternoon good evening zhangsan male lisi female wangwu male
D:datatestdata.txt D:dataoutput
afternoon 2 evening 2 female 2 good 6 lisi 2 male 4 morning 2 wangwu 2
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算