MapReduce工作原理及基础编程

概念介绍

MapReduce 是一种用于处理和生成大规模数据集的编程模型。

它将整个计算过程分为两个核心阶段:Map(映射)Reduce(归并)。在 Map 阶段,数据被分成小块并并行处理,提取出中间的键值对结果;在 Reduce 阶段,框架会将所有具有相同键的数据聚合起来,由用户定义的逻辑进行归并处理,最终输出结果。

MapReduce 的核心优势是:可扩展性强、适合大数据分布式处理、编程模型简单,框架自动负责数据的调度与容错。

通俗地说,它就像是一条流水线:先把原料(大数据)切碎并加工(Map),然后将相同类型的零件集中起来装配成品(Reduce)。

执行流程

实际上MapReduce有三个阶段:Map(映射)、Shuffle(派发)、Reduce(规约)。

Map 阶段,每条输入数据(通常是一行文本)被送入用户自定义的 Mapper 类中进行处理。开发者在这个阶段编写逻辑,将每条原始数据“映射”为一组中间键值对(key-value),例如统计词频时,把一句话拆分成每个词,并赋予每个词一个初始计数(如 key 为单词,value 为 1)。这些中间结果不会立即输出,而是会经过分区、排序和归组。

接下来是 Shuffle 阶段,这是 MapReduce 框架自动完成的核心过程。框架会将所有 Mapper 输出的 key 按照值进行聚合,也就是说,把所有具有相同 key 的记录集中在一起,并将它们分发给相应的 Reducer。这个阶段还涉及网络传输(跨节点数据移动)、排序(对 key 进行字典序排列)以及归组(将相同 key 的 value 组成集合),是整个 MapReduce 中性能和资源消耗最多的一部分。

最后是 Reduce 阶段,所有已经被聚合并排序好的数据会输入到开发者自定义的 Reducer 中。Reducer 接收的是某一个 key 以及与它相关联的所有 value 列表,开发者在这个阶段编写聚合逻辑,如累加、求平均、取最大值等。处理后的结果会以最终输出的形式写入 HDFS 或本地文件系统,形成最终的数据处理结果。

所以说,根据以上叙述我们可以得知:在实际的编程中,我们要做的就是编写Map和Reduce操作的代码。同时我们也可以理解到,Map的处理过程应当是(行号, 一行文本)(key, value);Reduce的处理过程应当是(key, values)(key, value)

编程思想

在编写相关Java代码时,我认为最重要的就是理清楚输入输出类型和输入到输出的处理过程,也即(行号, 一行文本) 是怎么变到 (key, value)的,(key, values) 又是怎么变到 (key, value)的。

标准Mapper类

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 1. 读取一行文本
        String line = value.toString();

        // 2. 分割字段(假设是以逗号分隔的 CSV)
        String[] fields = line.split(",");

        // 3. 做简单校验
        if (fields.length >= 2) {
            String someKey = fields[0];         // 例如:用户ID
            int someValue = Integer.parseInt(fields[1]);  // 例如:交易金额

            // 4. 设置输出
            outKey.set(someKey);
            outValue.set(someValue);

            // 5. 输出 key-value 对
            context.write(outKey, outValue);
        }
    }
}

标准Reducer类

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();  // 累加所有值
        }

        result.set(sum);
        context.write(key, result);  // 输出结果
    }
}

标准Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class MyDriver {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MyDriver <input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "My MapReduce Job");

        job.setJarByClass(MyDriver.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

实际上,按照以上标准代码,我们需要做的就是修改输入输出的Key/Value类型数据处理逻辑

常见的输入和输出 Key/Value 类型

类型说明用途示例
LongWritable长整型(64位)输入 key,通常表示文本行的字节偏移量(offset)
IntWritable整型(32位)常用于输出 value,如计数、数值统计
Text字符串类型,Hadoop自带字符串类型输入或输出的 key/value,多用于字符串数据
DoubleWritable双精度浮点型处理小数值,如评分、权重等
FloatWritable单精度浮点型同上,较少用
BooleanWritable布尔型标记型字段
NullWritable空值类型不输出 key 或 value 时用,比如只输出 key
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇