likes
comments
collection
share

MapReuce 详解与复现, 完成 MIT 6.824(6.5840) Lab1

作者站长头像
站长
· 阅读数 14

背景: MapReduce是谷歌于2004年提出的一种用于并行处理海量数据的算法模型。 MapReduce、 GFS、和Bigtable被成为谷歌分布式系统的"三驾马车", 共同开启了工业化的大数据时代,。虽然谷歌公开了MapReduce的论文, 但其中的配图和实现细节还是有些模糊和懊恼, 目前也缺乏详细介绍如何复现的文章。这篇文章致力于 MapReduce 的最详细解释与算法复现。看完本文章你将有能力独立完成 MIT 6.8214(6.5804) Lab1。 希望你提前看过mapreduce论文和MIT 6.824(6.5804) lab 1, 这样理解起来更加顺畅。

MapReduce 宏观介绍

根据MapReduce论文的摘要, MapReduce是一种用于处理和生成大规模数据的一种算法模型。对于输入数据, 用户指定一种 MapFunc 则可以生成一个个键值对(称作 intermediate) , 这一过程被叫做 Map。对于Intermediate, 用户指定一种 Reduce Function,则可以根据Intermediate 的 key 对Intermediate 进行合并归类, 这一过程被叫做 Reduce

MapReuce 详解与复现, 完成 MIT 6.824(6.5840) Lab1

按照MapReduce模型实现的程序, 天然地就可以并行地跑在不同的机器上(原文中称作商业服务器集群)。MapReduce对大规模数据并行化处理的一系列关键性问题提出了解决方案, 包括如何处理输入信息,如何将一个总任务拆分成若干子任务, 如何向集群的机器分发任务, 如何对集群中的机器容错处理等。即使程序员以前没有分布式系统的相关经验,只要实现了MapReduce算法, 就可以利用起集群进行大规模数据处理。

MapReduce 应用实例

MapReduce 一个最常见单的应用子是word count, 也就是给定一系列文本文件, 然后统计出每一个单词的数量。出于尽可能详细解释的目的, 我将直接采用分布式实现的示意图, 而不是单机版的实现方式。简单来说, MIT 6.824 Lab 1 的实现逻辑就是对这张示意图的代码翻译。

MapReuce 详解与复现, 完成 MIT 6.824(6.5840) Lab1

一个理想的, 完整的使用MapReduce实现word count任务, 执行顺序如下:

  1. 初始化 coordinator(或者称为master), 读取输入文件
  2. master 将输入文件转化成 map task
  3. master 将准备好的 map task 分配给各个 mapper (需要收到 mapper 请求 map task)
  4. 各个mapper根据输入文件, 将文本内容拆分成一个个 key-val 键值对, key 为 word, val 恒为1, 键值对列表称作 intermediate
  5. mapper 将 intermediate 分成NReduce(reducer个数) 份, 拆分规则为对 key 取hash再对NReduce取模
  6. mapper 将 拆分后的 intermediate 写入临时路径, 并将临时路径告知给master, 表示 map task 任务完成
  7. master 校验 mapper 的请求, 如果请求合法则将 intermediate 临时路径的文件写入最终路径
  8. master 如果确认所有 map task 完成, 则根据 intermediate 最终路径生成 reduce task
  9. master 将准备好的 reduce task 分配给各个 reducer (需要收到 reducer 请求 reducer task)
  10. reducer 收到 reduce task 后, 第一步就是收集各个 mapper 生成的 intermediate
  11. reducer 随后对 收集来的 intermediate 按照 key 进行排序
  12. reducer 对排序后的 intermediate 进行 reduce 操作, 也就是统计出 key 相同的键值对个数并将其作为新的 val
  13. reducer 将 reduce result 写入临时路径, 并将临时路径告知给master, 表示 reduce task 任务完成
  14. master 校验 reducer 的请求, 如果请求合法则将 reduce task 临时路径的文件写入最终路径
  15. master 如果确认所有 reduce task 完成, 则结束程序

这和论文里面的插图略有不同,其主要区别在于论文里面更注重宏观的 MapReduce 思想: 根据输入生成 intermediate, mapper 告知 master intermediate 位置, master 告诉 reducer intermediate 位置, 由 reducer 自行去 处理 intermediate 。而我上面的示意图特指 word count 任务, 忽略了一些不重要的细节比如 user program(用于初始化worker, master, 指定任务类型之类的功能), 并且补充了补充了很多细节, 主要有以下部分:

  1. 对intermediate 和 reduce 结果文件进行分批次处理, 划分出临时路径和最终路径, 相当于 git add 和 git commit 的。 只有当master 判断任务确实完成之后, 才会将intermediate 和 reducer生成的 result 文件写入 最终路径。
  2. 明确 intermediate 将会按照 hash(key) mod NReducer (reducer 的并发实例) 进行拆分, 其目的在于尽可能随机公平地分发给不同的 reducer, 当然你也可以 取 val 的hash值 这都不重要。
  3. 明确 一个输入文件对应 一个 mapper, 其目的在于简单处理输入, 如果你不嫌麻烦, 也可以把所有输入文件中的每一个单词读取出来然后汇总,再把汇总想办法平均地 随机地拆成 N(mapper并发实例) 份。
  4. 明确了 map 操作,即将文本拆分成 intermediate, 其中key 为单词, word 恒为1, 可以理解为一个单词, 一个 key-val 对。明确了一个 reduce 操作, 对于 key 相同的 intermediate, 统计其个数作为 val。如果只有一个 reducer, 那么最终这个 reducer的输出就是 输入文件里所有单词的 词频统计。

上面的示意图出于简洁的目的, 我只画了 mapper 1 与 reducer 1 与 master 的通信过程。事实上 mapper 2 和 reducer 2 也是同样的, 也有 ReqTask, MapTask Done, ReduceTask Done 的过程, 只不过图片上不好展示, 故此省略。

MapReduce 实现关键

为什么说上面的步骤是一个理想的顺序呢? 因为这个步骤有许多关键性的问题没有考虑到, 我将其分为下面几大类。

如何容错

  1. 如果 worker(mapper 或 reducer )执行到某一步后报错,worker 如何处理, 重新执行一遍, 还是放弃当前任务, 重新请求新 task 直接执行 新task? 要不要主动告知 master? 中途生成的中间文件如何处理。
  2. master 如果发现某一个 task 超时了, 如何处理?
  3. master 如果发现某一个 task 被重复执行了多次, 如何处理?

如何控制 worker 并行数量

  1. 上面的示意图我将 mapper 和 worker 部分进行了区分, 这是符合直观逻辑的。但在实际机器中这是效率低下的, 比如示意图中有两个 mapper 和 三个 worker。难道我就真的要启动两个进程(或者两台机器)去执行 map task, 3个进程(或者3台机器)去执行 reduce task 了吗? 或者这整个 task 对于 reduce 来说比较重, 我想要加到4个 reducer 难道要手动去加4个进程或是4台机器吗? 这显然是不可接受的。

数据竞争

  1. 一个 MapReduce 模型中 master 与 worker (mapper 或是 reducer) 存在一对多的关系, 当多个 worker 同时请求 master 时,可能会造成 master 内部数据 读写冲突, 这时如何处理?

MapReduce 实现思路

围绕这些关键问题, 这里给出我的解决思路

容错处理

  1. worker 只要没有收到全部 task 完成的信号, 就会一直 for 循环, 不断地向 master 请求 task, 执行task。worker 如哦执行过程中捕获到 error 无需向 master 进行报告, 直接进入下一个 循环请求新 task。 当然, worker 也可以主动向 master 上报错误, 但这回消耗一次请求, 而且 master 本身会监控 task 状态(下面会讲), 如此则显得多余。
  2. master 不断监控 task 执行状态, 若发现 task 超时, 则会重新分配给 worker 执行,所以此处应该有一个 task maintainer, 用于维护 task 状态(如 task 类型, 执行时间, task Id 等)
  3. 为了防止一个 task 被执行多次, 可以用一个 taskId 标记这个 task, taskId 是一个递增的 counter, 每分配给 worker 一次就会 加1, taskId会被 master的 task maintainer 所记录, 也会被 送到 worker。 当 worker 请求 master 完成 task以后, master 会对 worker 传回来的 taskId 进行校验, 如果 worker 传回来的 taskId 在 master task maintainer 里面找不到, 则说明这个 task 已经超时被 再次分配出去(再次分配 taskId 就会递增所以找不到匹配)。此时 master 会认为 worker 请求 完成的 task 非法, 因为 taskId 不匹配。worker收到 任务完成失败的信号后不做任何处理, 直接进入下一个循环, 再次向 master 请求 task, 执行 task, 向master汇报完成 task。

worker 并行控制

  1. 不对 worker 做严格区分, 也就是 worker 到底是 mapper 还是 reducer, 去决定当前 task 的状态。若 master 返回的是 map task, 则这个 worker 就是一个 mapper。 若 master 返回的是 reduce task, 则这个 worker 就是一个 reducer。 只有当全部 map task 完成以后, master 才会生成 reduce task。mapper 或者 reducer。这种思路也是MIT 6.824 Lab1 所推荐的, 其中的 test-mr.sh 直接启动 多个worker进程, 而没有分开 mapper 和 reducer。
  2. mapper 和 reducer 数量由 用户程序确定(也就是论文中的 user program)。 mapper 或者 reducer 实际工作的 并行数由 物理机器所决定实。 比如你在 userprogram 里面指定我要有10个 mapper , 10 个 reducer。你在你的机器上也确实开了10个进程, 但你的机器CPU一共只有4个核心, 这就意味着即使机器功率全开也最多只有4个 worker 在同时工作,其余 worker 处于等待状态。

数据竞争

  1. 对 master 所有成员变量进行加锁, 确保每一次操作完成员变量后及时释放锁, 尽量做到锁范围的最小化, 避免某些操作长时间占用锁(后面会说到)

实现代码

rpc.go 代码与注释

rpc.go 中我定义了一些 master(lab 中叫 coordinator 不过我还是习惯叫 master) 和 worker 共享的常量, 主要都是些华人任务状态相关的变量. 特别的是需要提前创建好 TmpMapFilePath, TmpReduceFilePath, 和 FinalMapFilePath 这三个路径, 否则运行时将会报错。FinalReduceFilePath 一定要需要位于 6.824/src/main/mr-tmp 路径下, 这是由测试脚本 test-mr.sh所确定的。

源码如下:

package mr

import "time"

type TaskType string
type TaskStatus string
type AssignPhase string

const (
   // Define task type
   Map    TaskType = "MapTask"
   Reduce TaskType = "ReduceTask"

   // Define AssignPhase
   MapPhase    AssignPhase = "MapPhase"
   ReducePhase AssignPhase = "ReducePhase"

   // Define tmp files and final files directories
   TmpMapFilePath      = "/tmp/tmp_map_file/"
   TmpReduceFilePath   = "/tmp/tmp_reduce_file/"
   FinalMapFilePath    = "/tmp/final_map_file/"
   FinalReduceFilePath = "/home/6.824/src/main/mr-tmp/"

   // Define task expired time
   TaskExpiredTime = 10

   // Define task status
   Ready    TaskStatus = "Ready"
   Running  TaskStatus = "Running"
   Finished TaskStatus = "Finished"
)

type Task struct {
   TaskType       TaskType   // the task type, map or reduce
   MapWorkerId    int        // worker id if in map phase, given my master
   ReduceWorkerId int        // worker id if in reduce phase, given my master
   InputFile      string     // if in map phase it should be a single file, if in reduce phase it should be a file pattern
   BeginTime      time.Time  // task begin time, given by master
   TaskStatus     TaskStatus // task status, ready, running or finished, for worker it should always be running
}

// worker request master for task
type TaskArgs struct {
   WorkerId int
}

// master reply worker a task(the task might be nil if no task available)
type TaskReply struct {
   Task       *Task
   ReducerNum int  // the number of reducer, so the mapper can seperate intermediate for different reducer
   AllDone    bool // true if all task done then the worker will exit, otherwise loop request master for task
}

// mapper reports to master that map task should be done
type MapTaskDoneArgs struct {
   MapWorkerId int
   Files       []string // the intermediate files for different reducer
}

// master reply for mapper's map task done request
type MapTaskDoneReply struct {
   Err error // nil if the task done is confirmed by master
}

// reducer reports to master that reduce task should be done
type ReduceTaskDoneArgs struct {
   ReduceWorkerId int
   File           string
}

// master reply for reducer's reduce task done request
type ReduceTaskDoneReply struct {
   Err error
}

// key-val pair for intermediate
type KeyValue struct {
   Key   string
   Value string
}

// intermediate
type ByKey []KeyValue

// for sort the intermediate
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

coordinator.go 代码与注释

详细解释以下主要函数的作用:

  1. MakeCoordinator, 进行 master的初始化, 使用两个 channel 对 map task 和 reduce task进行维护,mapper 数量取决于 file 个数, reducer 数量由 user program 指定, 也就是 6.824/src/main.mrcoordinator.go 所决定(写死为10),初始化 map task, 开一个协程专门用来监控 task 状态。
  2. periodicallyRmExpTasks, 不断地从存储 map task 和 reduce task 的channel 中获取task 来查看 task 是否超时, 若超时则重新置为 ready状态, 等待被分配出去。 由于遍历 channel的过程中需要全程上锁, 所以每一个 for 循环内都需要休眠一段时间 , 将锁释放出来
  3. AssignTask, 响应 worker 请求 task。每分配出去一次, master的 workerId 都会加1, 这样同样的 task 被分配出去多次(比如第一次超时, 需要再次分配出去) 对于 master来说也不会搞混。当worker请求任务完成时对比请求的 workeriD 和 master 的 channel 中的 workerId 就可以一一对应。
  4. MapTaskDone 与 ReduceTaskDone, 响应 mapper 和 reducer task 完成的请求, 不断地遍历channel 中的task, 看看是否有满足条件的 task 可以被设置为 finished, 顺便也检查一下哪些task超时。
  5. genMapTasks, genReduceTasks 分别用来生成 map task 和 reduce task。 注意 reduce task 只有在 所有 map task 完成以后才会生成, 其中的输入文件是一个 file pattern, 其实是多个 mapper 生成的文件集合。
  6. genMapFinalFiles, genReduceFinalFiles 都是将 mapper 和 reducer 生成的文件进行确认并写入一个新路径, 相当于 git 中的 commit 操作。
  7. rmMapTmpFiles 和 rmReduceTmpFiles 分别用于在所有 map task 和 reduce task完成后清除临时路径下的文件。
  8. ifAllMapDone 和 ifAllReduceDone 分别用于判断是否所有 map task 和 reduce task 完成, 都是辅助函数。
  9. Done() 返回是否所有 task完成, 用于 user program 判断是否可以退出程序。

源码如下:

package mr

import (
   "errors"
   "fmt"
   "io"
   "log"
   "net"
   "net/http"
   "net/rpc"
   "os"
   "path/filepath"
   "strconv"
   "sync"
   "time"
)

type Coordinator struct {
   MapChannel     chan *Task  // the channel with buffer to maintain map tasks
   ReduceChannel  chan *Task  // the channel with buffer to maintain reduce task
   Files          []string    // the input file list, each file corresponds to a map worker
   MapperNum      int         // the number of map worker, determined by the length of input file list
   ReducerNum     int         // the number of reduce worker, determined by the main function
   AssignPhase    AssignPhase // the variable that indicates the working phases: MapPhase or Reduce Phase
   MapWorkerId    int         // the counter for map task, staring from 0
   ReduceWorkerId int         // the counter for reduce task, staring from 0
   AllMapDone     bool        // the variable that indicates if all map tasks are done
   AllReduceDone  bool        // the variable that indicates if all reduce tasks are done
   Lock           sync.Mutex  // the lock to lock coordinator's member variable(ie MapChannel, ReduceChannel,etc) avoiding read-write confilicts

}

// Intialize the coordinater
func MakeCoordinator(files []string, nReduce int) *Coordinator {
   log.Println("Make MakeCoordinato")
   c := Coordinator{
      MapChannel:     make(chan *Task, len(files)), // buffer size is determined by the length of Files
      ReduceChannel:  make(chan *Task, nReduce),    // buffer size is determined by the main function
      Files:          files,                        // input file list
      MapperNum:      len(files),                   // the number of map worker, determined by the length of input file list
      ReducerNum:     nReduce,                      // the number of reduce worker, determined by the user program
      AssignPhase:    MapPhase,                     // start with map phase
      MapWorkerId:    0,                            // starts from 0
      ReduceWorkerId: 0,                            // starts from 0
   }

   // generate map task given input file list
   c.genMapTasks(files)

   // periodically re
   go c.periodicallyRmExpTasks()

   // listen to rpc call
   err := c.server()
   if err != nil {
      log.Println("MakeCoordinator.c.server() err = ", err)
      return nil
   }

   return &c
}

// Generate map tasks and send these task to map channel
func (c *Coordinator) genMapTasks(files []string) {
   c.Lock.Lock()
   for _, file := range files {
      task := Task{
         TaskType:   Map,
         InputFile:  file,
         TaskStatus: Ready,
      }
      c.MapChannel <- &task
      log.Println("Finish generating map task : ", task)
   }
   c.Lock.Unlock()
   log.Println("Finish generating all map tasks")
}

// Generate reduce tasks and send these task to map channel
func (c *Coordinator) genReduceTasks() {
   c.Lock.Lock()
   for i := 0; i < c.ReducerNum; i++ {
      task := Task{
         TaskType:   Reduce,
         InputFile:  fmt.Sprintf("%vmr-*-%v", FinalMapFilePath, i),
         TaskStatus: Ready,
      }
      log.Println("Finish generating map task : ", task)
      c.ReduceChannel <- &task
   }
   c.Lock.Unlock()
   log.Println("Finish generating all reduce tasks")
}

// Periodically remove expired tasks and reset them into ready status
func (c *Coordinator) periodicallyRmExpTasks() {
   for !c.Done() {
      time.Sleep(time.Second)
      c.Lock.Lock()
      if c.AssignPhase == MapPhase {
         for i := 0; i < c.MapperNum; i++ {
            task := <-c.MapChannel
            c.MapChannel <- task
            if task.TaskStatus == Running && (time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second) {
               task.TaskStatus = Ready
               log.Printf("Task with MapWorker id = %d is expired", task.MapWorkerId)
            }
         }
      } else {
         for i := 0; i < c.ReducerNum; i++ {
            task := <-c.ReduceChannel
            c.ReduceChannel <- task
            if task.TaskStatus == Running && (time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second) {
               task.TaskStatus = Ready
               log.Printf("Task with ReduceWorker id = %d is expired", task.ReduceWorkerId)
            }
         }
      }
      c.Lock.Unlock()
   }
}

// Assign available task from map channel or reduce channel to worker by rpc call
func (c *Coordinator) AssignTask(args *TaskArgs, reply *TaskReply) error {

   c.Lock.Lock()
   if c.AllMapDone && c.AllReduceDone {
      reply.AllDone = true
      c.Lock.Unlock()
      return nil
   }

   if c.AssignPhase == MapPhase {
      for i := 0; i < c.MapperNum; i++ {
         task := <-c.MapChannel
         c.MapChannel <- task
         if task.TaskStatus == Ready {
            task.MapWorkerId = c.MapWorkerId
            c.MapWorkerId++
            task.TaskStatus = Running
            task.BeginTime = time.Now()
            reply.Task = task
            reply.ReducerNum = c.ReducerNum
            reply.AllDone = false
            log.Println("Distribute map task, task = ", task)
            c.Lock.Unlock()
            return nil
         }
      }
      c.Lock.Unlock()
      if c.ifAllMapDone() {
         c.genReduceTasks()
         c.Lock.Lock()
         c.AssignPhase = ReducePhase
         c.Lock.Unlock()
         err := c.rmMapTmpFiles()
         if err != nil {
            log.Println("AssignTask.rmMapTmpFiles err = ", err)
         }
      }
      log.Println("No map task available")
      return errors.New("No map task available")
   } else {
      for i := 0; i < c.ReducerNum; i++ {
         task := <-c.ReduceChannel
         c.ReduceChannel <- task
         if task.TaskStatus == Ready {
            task.ReduceWorkerId = c.ReduceWorkerId
            c.ReduceWorkerId++
            task.TaskStatus = Running
            task.TaskType = Reduce
            task.BeginTime = time.Now()
            reply.Task = task
            reply.AllDone = false
            log.Println("Distribute reduce task = ", task)
            c.Lock.Unlock()
            return nil
         }
      }
      c.Lock.Unlock()
      if c.ifAllReduceDone() {
         reply.AllDone = true
         err := c.rmMapFinalFiles()
         if err != nil {
            log.Println("AssignTask.rmMapFinalFiles err = ", err)
         }
         err = c.rmReduceTmpFiles()
         if err != nil {
            log.Println("AssignTask.rmMapFinalFiles err = ", err)
         }
         return nil
      }
      log.Println("No reduce task available")
      return errors.New("No reduce task available")
   }

}

// Response to Map task done request from worker
func (c *Coordinator) MapTaskDone(args *MapTaskDoneArgs, reply *MapTaskDoneReply) error {

   c.Lock.Lock()
   if c.AllMapDone {
      c.Lock.Unlock()
      return errors.New("All map done")
   }

   for i := 0; i < c.MapperNum; i++ {
      task := <-c.MapChannel
      c.MapChannel <- task
      if task.TaskStatus == Running && time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second {
         task.TaskStatus = Ready
         log.Printf("Map task with MapWorkerId = %d expired", task.MapWorkerId)
         continue
      }

      if args.MapWorkerId == task.MapWorkerId && task.TaskStatus == Running && time.Now().Sub(task.BeginTime) <= TaskExpiredTime*time.Second {
         task.TaskStatus = Finished
         err := c.genMapFinalFiles(args.Files)
         if err != nil {
            task.TaskStatus = Ready
            reply.Err = err
            log.Println("MapTaskDone.genMapFinalFiles err = ", err)
            c.Lock.Unlock()
            return err
         }
         log.Printf("Map task with MapWorkerId = %d is finished in time", task.MapWorkerId)
         c.Lock.Unlock()
         return nil

      }
   }
   c.Lock.Unlock()

   reply.Err = errors.New(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
   log.Println(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
   return errors.New(fmt.Sprintf("Map task with MapWorkerId = %d cannot be done", args.MapWorkerId))
}

// Generate Map task final file
func (c *Coordinator) genMapFinalFiles(files []string) error {
   for _, file := range files {
      tmp_file, err := os.Open(file)
      if err != nil {
         log.Println("genMapFinalFiles err = ", err)
         return err
      }
      defer tmp_file.Close()
      tmp_file_name := filepath.Base(file)
      final_file_path := FinalMapFilePath + tmp_file_name
      final_file, err := os.Create(final_file_path)
      if err != nil {
         log.Println("genMapFinalFiles.os.Create err = ", err)
         return err
      }
      defer final_file.Close()
      _, err = io.Copy(final_file, tmp_file)
      if err != nil {
         log.Println("genMapFinalFiles.io.Copy err = ", err)
         return err
      }
   }
   return nil
}

// Response to reduce task done request from worker
func (c *Coordinator) ReduceTaskDone(args *ReduceTaskDoneArgs, reply *ReduceTaskDoneReply) error {

   c.Lock.Lock()
   if c.AllReduceDone {
      log.Println("All reduce task done")
      c.Lock.Unlock()
      return errors.New("All reduce task done")
   }

   for i := 0; i < c.ReducerNum; i++ {
      task := <-c.ReduceChannel
      c.ReduceChannel <- task
      if task.TaskStatus == Running && time.Now().Sub(task.BeginTime) > (TaskExpiredTime)*time.Second {
         task.TaskStatus = Ready
         log.Printf("Reduce task with ReduceWorkerId = %d expired", task.ReduceWorkerId)
         continue
      }
      if args.ReduceWorkerId == task.ReduceWorkerId && task.TaskStatus == Running && time.Now().Sub(task.BeginTime) <= (TaskExpiredTime)*time.Second {
         task.TaskStatus = Finished
         err := c.genReduceFinalFile(args.File)
         if err != nil {
            log.Println("ReduceTaskDone.genReduceFinalFile err = ", err)
            task.TaskStatus = Ready
            reply.Err = err
            c.Lock.Unlock()
            return err
         }
         log.Printf("Reduce task with ReduceWorkerId = %d is finished in time", task.ReduceWorkerId)
         c.Lock.Unlock()
         return nil

      }
   }
   c.Lock.Unlock()

   reply.Err = errors.New(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
   log.Println(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
   return errors.New(fmt.Sprintf("Reduce task with ReduceWorkerId = %d cannot be done", args.ReduceWorkerId))
}

// Generate Reduce task final file
func (c *Coordinator) genReduceFinalFile(file string) error {
   tmp_file, err := os.Open(file)
   if err != nil {
      log.Println("genReduceFinalFile.os.Open err = ", err)
      return err
   }
   defer tmp_file.Close()
   tmp_file_name := filepath.Base(file)
   final_file_path := FinalReduceFilePath + tmp_file_name
   final_file, err := os.Create(final_file_path)
   if err != nil {
      log.Println("genReduceFinalFile.os.Create err = ", err)
      return err
   }
   defer final_file.Close()
   _, err = io.Copy(final_file, tmp_file)
   if err != nil {
      log.Println("genReduceFinalFile.os.Copy err = ", err)
      return err
   }
   return nil
}

// Remove map task's  temporary files
func (c *Coordinator) rmMapTmpFiles() error {
   d, err := os.Open(TmpMapFilePath)
   if err != nil {
      log.Println("rmMapTmpFiles os.Open err = ", err)
      return err
   }
   defer d.Close()
   names, err := d.Readdirnames(-1)
   if err != nil {
      log.Println("rmMapTmpFiles.d.Readdirnames err = ", err)
      return err
   }
   for _, name := range names {
      err = os.RemoveAll(TmpMapFilePath + name)
      if err != nil {
         log.Println("rmMapTmpFiles.os.RemoveAll err = ", err)
         return err
      }
   }
   return nil
}

// Remove map task's final files
func (c *Coordinator) rmMapFinalFiles() error {
   d, err := os.Open(FinalMapFilePath)
   if err != nil {
      log.Println("rmMapFinalFiles.os.Open err = ", err)
      return err
   }
   defer d.Close()
   names, err := d.Readdirnames(-1)
   if err != nil {
      log.Println("rmMapFinalFiles.d.Readdirnames err = ", err)
      return err
   }
   for _, name := range names {
      err = os.RemoveAll(FinalMapFilePath + name)
      if err != nil {
         log.Println("rmMapFinalFiles.os.RemoveAll err = ", err)
         return err
      }
   }
   return nil
}

// Remove reduce task's temporary files
func (c *Coordinator) rmReduceTmpFiles() error {
   d, err := os.Open(TmpReduceFilePath)
   if err != nil {
      log.Println("rmReduceTmpFiles.os.Open err = ", err)
      return err
   }
   defer d.Close()
   names, err := d.Readdirnames(-1)
   if err != nil {
      log.Println("rmReduceTmpFiles.d.Readdirnames err = ", err)
      return err
   }
   for _, name := range names {
      err = os.RemoveAll(TmpReduceFilePath + name)
      if err != nil {
         log.Println("rmReduceTmpFiles.os.RemoveAll err = ", err)
         return err
      }
   }
   return nil
}

// Vist every task in map channel and find out if all map task were done
func (c *Coordinator) ifAllMapDone() bool {
   c.Lock.Lock()
   for i := 0; i < c.MapperNum; i++ {
      task := <-c.MapChannel
      if task.TaskStatus != Finished {
         c.MapChannel <- task
         c.Lock.Unlock()
         return false
      }
      c.MapChannel <- task
   }
   c.AllMapDone = true
   c.Lock.Unlock()
   return true
}

// Vist every task in reduce channel and find out if all reduce task were done
func (c *Coordinator) ifAllReduceDone() bool {
   c.Lock.Lock()
   for i := 0; i < c.ReducerNum; i++ {
      task := <-c.ReduceChannel
      c.ReduceChannel <- task
      if task.TaskStatus != Finished {
         c.Lock.Unlock()
         return false
      }
   }
   c.AllReduceDone = true
   c.Lock.Unlock()
   return true
}

// Return if all task were done
func (c *Coordinator) Done() bool {
   c.Lock.Lock()
   if c.AllMapDone && c.AllReduceDone {
      c.Lock.Unlock()
      log.Println("All map tasks and reduce task done!")
      time.Sleep(3 * time.Second) // Sleep 3s to let all workers know all task done and it's time to exit before main routine
      return true
   }
   c.Lock.Unlock()
   return false
}

func (c *Coordinator) server() error {
   rpc.Register(c)
   rpc.HandleHTTP()

   sockname := "/var/tmp/824-mr-"
   sockname += strconv.Itoa(os.Getuid())
   os.Remove(sockname)
   l, e := net.Listen("unix", sockname)
   if e != nil {
      log.Fatal("listen error:", e)
   }
   go http.Serve(l, nil)
   return nil
}

worker.go 代码与注释

详细解释以下主要函数的作用:

  1. Worker, 用于初始化 worker, 当 worker并没有获取到 task时, 其workerId 为 -1.

  2. work, 只要没有收到全部 task 完成的信号, 就不断地请求 task, 执行 task。特别的是当请求 task遇到 error 时(比如没有可被执行的 task), 需要休眠1秒, 其目的在于不至于频繁发起请求把 master 打挂, 因为 worker的数量远大于 master

  3. doMap, 执行 map task 过程, 先读文件生成intermediate(此时 key 为 word, val 为1), 然后将intermediate 拆分到不同的临时文件中(给不同的 reducer 使用), 随后请求 master上报 task完成

  4. doReduce, 执行 reduce task过程, 先收集不同 mapper 生成 intermediate, 再对 intermediate 排序, 再统计 key 相同的 数量作为 intermediate 的 val, 再将结果写入临时文件, 最后请求 master上报 task 完成

  5. call, 用于请求 master, 注意此处的 sockname 不能更改, 这是由测试脚本 test-mr.sh决定的, 里面会用到。

package mr

import (
   "encoding/json"
   "fmt"
   "hash/fnv"
   "io"
   "log"
   "net/rpc"
   "os"
   "path/filepath"
   "sort"
   "strconv"
)

type MRWorker struct {
   WorkerId   int                             // WorkerId, initialized by task.MapWorkerId or task.ReduceId
   MapFunc    func(string, string) []KeyValue // Map function, initialized by main function
   ReduceFunc func(string, []string) string   // Reduce function, initialized by main function
   Task       *Task                           // Task, given by coordinator.
   NReduce    int                             // the numer of reduce worker, given by coordinator
   IsDone     bool                            // the variable indicates if all task were done
}

// Initialized the worker
func Worker(map_func func(string, string) []KeyValue, reduce_func func(string, []string) string) {
   worker := MRWorker{
      WorkerId:   -1, // initialized to -1 when the worker isn't handle a task
      MapFunc:    map_func,
      ReduceFunc: reduce_func,
      IsDone:     false,
   }
   log.Println("Initial worker done")
   worker.work()
   log.Println("Worker with WorkerId = ", worker.WorkerId, " finished all works")
}

// Loop working as long as all tasks wern't finished
func (worker *MRWorker) work() {
   for !worker.IsDone {
      task, err := worker.reqTask()
      if err != nil {
         log.Println("work.reqTask err = ", err)
         continue
      }

      if task == nil {
         log.Printf("Worker with WorkerId = %d received all tasks done", worker.WorkerId)
         worker.IsDone = true
         break
      }

      log.Printf("Worker with WorkerId = %d received task = %v", worker.WorkerId, task)
      worker.Task = task

      if task.TaskType == Map {
         worker.WorkerId = task.MapWorkerId
         err := worker.doMap()
         if err != nil {
            log.Println("worker.doMap err = ", err)
            time.Sleep(time.Second)
            continue
         }
         log.Println("Map task done, task = ", task)
      } else {
         worker.WorkerId = task.ReduceWorkerId
         err := worker.doReduce()
         if err != nil {
            log.Println("worker.doReduce err = ", err)
            continue
         }
         log.Println("Reduce task done, task = ", task)
      }
   }
}

// Request task to coordinator
func (worker *MRWorker) reqTask() (*Task, error) {
   args := TaskArgs{}
   reply := TaskReply{}
   err := call("Coordinator.AssignTask", &args, &reply)
   if err != nil {
      log.Println("reqTask.call err = ", err)
      return nil, err
   }
   worker.Task = reply.Task
   if reply.AllDone {
      worker.IsDone = true
      return nil, nil
   }
   worker.NReduce = reply.ReducerNum
   return reply.Task, nil
}

// Execute map tesk details
func (worker *MRWorker) doMap() error {
   task := worker.Task

   intermediate, err := worker.genIntermediate(task.InputFile)
   if err != nil {
      log.Println("doMap.genIntermediate err = ", err)
      return err
   }

   tmp_files, err := worker.writeIntermediateToTmpFiles(intermediate)
   if err != nil {
      log.Println("doMap.writeIntermediateToTmpFiles :", err)
      return err
   }

   err = worker.mapTaskDone(tmp_files)
   if err != nil {
      log.Println("doMap.mapTaskDone err = ", err)
      for _, file := range tmp_files {
         err := os.Remove(file)
         if err != nil {
            log.Println("worker.mapTaskDone.os.Remove err = ", err)
         }
      }
      return err
   }
   return nil
}

// Generate intermediate key-val list
func (worker *MRWorker) genIntermediate(filename string) ([]KeyValue, error) {
   intermediate := make([]KeyValue, 0)
   file, err := os.Open(filename)
   if err != nil {
      log.Println("genIntermediate.os.Open", err)
      return nil, err
   }
   content, err := io.ReadAll(file)
   if err != nil {
      log.Println("genIntermediate.io.ReadAll", err)
      return nil, err
   }
   defer file.Close()

   kva := worker.MapFunc(filename, string(content))
   intermediate = append(intermediate, kva...)
   return intermediate, nil
}

// Write intermediate to map task's temporary files
func (worker *MRWorker) writeIntermediateToTmpFiles(intermediate []KeyValue) ([]string, error) {

   tmp_files := []string{}
   hashed_intermediate := make([][]KeyValue, worker.NReduce)

   for _, kv := range intermediate {
      hash_val := ihash(kv.Key) % worker.NReduce
      hashed_intermediate[hash_val] = append(hashed_intermediate[hash_val], kv)
   }

   for i := 0; i < worker.NReduce; i++ {
      tmp_file, err := os.CreateTemp(TmpMapFilePath, "mr-*.txt")
      if err != nil {
         log.Println("writeIntermediateToTmpFiles.os.CreateTemp err = ", err)
         return nil, err
      }
      defer os.Remove(tmp_file.Name())
      defer tmp_file.Close()

      enc := json.NewEncoder(tmp_file)
      for _, kv := range hashed_intermediate[i] {
         err := enc.Encode(&kv)
         if err != nil {
            log.Println("writeIntermediateToTmpFiles.enc.Encode", err)
            return nil, err
         }
      }

      file_path := fmt.Sprintf("mr-%v-%v", worker.WorkerId, i)
      err = os.Rename(tmp_file.Name(), TmpMapFilePath+file_path)
      if err != nil {
         log.Println("writeIntermediateToTmpFiles os.Rename: ", err)
         return nil, err
      }
      tmp_files = append(tmp_files, TmpMapFilePath+file_path)
   }

   return tmp_files, nil
}

// Report map task done to coordinator
func (worker *MRWorker) mapTaskDone(files []string) error {
   args := MapTaskDoneArgs{
      MapWorkerId: worker.WorkerId,
      Files:       files,
   }
   reply := MapTaskDoneReply{}
   err := call("Coordinator.MapTaskDone", &args, &reply)
   if err != nil {
      log.Println("mapTaskDone.call err = ", err)
      return err
   }
   if reply.Err != nil {
      log.Println("mapTaskDone.reply.Err = ", reply.Err)
      return reply.Err
   }
   return nil
}

// Execute reduce tesk details
func (worker *MRWorker) doReduce() error {

   intermediate, err := worker.collectIntermediate(worker.Task.InputFile)
   if err != nil {
      log.Println("doReduce.collectIntermediate err = ", err)
      return err
   }

   sort.Sort(ByKey(intermediate))

   res := worker.genReduceRes(intermediate)

   tmp_file, err := worker.writeReduceResToTmpFile(res)
   if err != nil {
      log.Println("doReduce.writeReduceResToTmpFile err = ", err)
      return err
   }

   err = worker.reduceTaskDone(tmp_file)
   if err != nil {
      log.Println("doReduce.reduceTaskDone err = ", err)
      err := os.Remove(tmp_file)
      if err != nil {
         log.Println("doReduce.os.Remove err = ", err)
      }
      return err
   }

   return nil
}

// Collect intermediate from different map workers' result files
func (worker *MRWorker) collectIntermediate(file_pattern string) ([]KeyValue, error) {
   intermediate := make([]KeyValue, 0)
   files, err := filepath.Glob(file_pattern)
   if err != nil {
      log.Println("collectIntermediate.filepath.Glob err = ", err)
      return nil, err
   }

   for _, file_path := range files {
      file, err := os.Open(file_path)
      if err != nil {
         log.Println("collectIntermediateos.Open err = ", err)
         return nil, err
      }
      dec := json.NewDecoder(file)
      for {
         var kv KeyValue
         if err := dec.Decode(&kv); err != nil {
            break
         }
         intermediate = append(intermediate, kv)
      }
   }

   return intermediate, nil
}

// Gen reduce result
func (worker *MRWorker) genReduceRes(intermediate []KeyValue) []KeyValue {
   res := make([]KeyValue, 0)
   i := 0
   for i < len(intermediate) {
      //  the key in intermediate [i...j]  is the same since intermediate is already sorted
      j := i + 1
      for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
         j++
      }

      // sum the val number of intermediate [i...j]
      values := []string{}
      for k := i; k < j; k++ {
         values = append(values, intermediate[k].Value)
      }
      output := worker.ReduceFunc(intermediate[i].Key, values)

      kv := KeyValue{Key: intermediate[i].Key, Value: output}
      res = append(res, kv)

      i = j
   }
   return res
}

// write reduce task's result to temporary file
func (worker *MRWorker) writeReduceResToTmpFile(res []KeyValue) (string, error) {
   tempFile, err := os.CreateTemp(TmpReduceFilePath, "mr-") ///home/distributed_system/tmp_res_file/mr-xxxxx(随机字符串)
   if err != nil {
      log.Println("writeReduceResToTmpFile.os.CreateTemp err = ", err)
      return "", err
   }

   // write key-val pair into tmp file
   for _, kv := range res {
      fmt.Fprintf(tempFile, "%s %s\n", kv.Key, kv.Value)
   }

   temp_name := TmpReduceFilePath + "mr-out-" + strconv.Itoa(worker.WorkerId) + ".txt"
   err = os.Rename(tempFile.Name(), temp_name)
   if err != nil {
      log.Println("writeReduceResToTmpFile.os.Rename err = ", err)
      return "", err
   }

   return temp_name, nil
}

// Report reduce task done to coordinator
func (worker *MRWorker) reduceTaskDone(file string) error {
   args := ReduceTaskDoneArgs{
      ReduceWorkerId: worker.WorkerId,
      File:           file,
   }
   reply := ReduceTaskDoneReply{}
   err := call("Coordinator.ReduceTaskDone", &args, &reply)
   if err != nil {
      log.Println("reduceTaskDone.call ", err)
      return err
   }
   if reply.Err != nil {
      log.Println(err)
      return reply.Err
   }
   return nil
}

func ihash(key string) int {
   h := fnv.New32a()
   h.Write([]byte(key))
   return int(h.Sum32() & 0x7fffffff)
}

// rpc call function that send request to coordinator
func call(fun_name string, args interface{}, reply interface{}) error {

   sockname := "/var/tmp/824-mr-"
   sockname += strconv.Itoa(os.Getuid())
   c, err := rpc.DialHTTP("unix", sockname)
   if err != nil {
      log.Fatal("dialing:", err)
      return err
   }
   defer c.Close()

   err = c.Call(fun_name, args, reply)
   if err != nil {
      log.Println("call.call", err)
      return err
   }

   return nil
}

其他说明

这只是我是实现的方法, 其他实现方法也都大同小异。有些共同的关键的点需要注意

  1. 持有锁的操作步骤要尽可能地少, 因为性能的瓶颈在于maser, master的瓶颈在于锁, 一旦对资源操作完成就应该立即释放锁。 也要确保每一个函数 return 之前一定要释放锁, 再调用其他函数的时候也要释放锁, 避免其他函数获取不到锁。

  2. 每一次task被分发出去, 都需要对这个task做出标记, 比如我用一个递增的 workerId 标记, 这样避免了一个 task 被分发出去多次无法区分的情况。

  3. test-mr.sh 脚本有点问题, 在某些系统中可能某些命令执行不了导致 跑不过测试用例。比如在 centos-7 中, test-mr.sh 中的 early exit test 中, 需要将

        wait -n 
    

    换成

        wait  
    

巨人的肩膀