MIT 6.824 Distributed Systems 分布式系统课程系列1 Lab1-MapReduce

  • 该课程包括分布式系统相关的各种知识。包括mapreduce,raft算法,Zookeeper,比特币等等。

  • 配备4个lab,需要动手写代码,实现mapreduce,raft算法,在raft基础上建立分布式kv服务。

  • 同时可学一下go语言。go语言争议很多,之前粗略看过两次都丧失兴趣。这次正好强制再学一次并实际应用。

  • 课程主页 https://pdos.csail.mit.edu/6.824/ https://pdos.csail.mit.edu/6.824/schedule.html 还有配套的视频。可去各大视频网站查找。


  • go 1.15安装 https://github.com/golang/go/wiki/Ubuntu sudo add-apt-repository ppa:longsleep/golang-backports sudo apt update sudo apt install golang-go export PATH=$PATH:/usr/lib/go-1.15/bin


实验主页


https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

  • 做一个mapreduce程序。处理多个文本,统计里面每个单词的数量。

  • 基本原理就是分而治之,分割任务,分配给多个机器去处理,得到一些可独立处理的中间结果,再多个机器做汇总的任务,得到最终结果。

  • 通过lab1可熟悉最基本的go代码:基本语法、文件操作、编解码、错误处理、rpc等等。


课程示例代码:

git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

cd 6.824
cd src/main
go build -buildmode=plugin ../mrapps/wc.go      //把wc变成一个类似插件
go run mrsequential.go wc.so pg*.txt            //跑mrsequential.go,用wc.so插件里的函数处理文件pg*.txt。

最终测试是运行 /src/main/test-mr.sh 需要看一下这个测试脚本来搞清流程。

看示例代码:

  • mrsequential.go是个顺序执行的流程,实现了最简单的mapreduce。

  • map函数把文字分割成单词,返回包含所有{单词,1}键值对的数组。框架把相同键的数据给reduce函数,reduce函数统计相同键数据的数量。

  • 最后把每个单词的数量输出到mr-out-0文件。

  • import “plugin” 可把自己的代码编成plugin。选择plugin里的函数。

  • intermediate = append(intermediate, kva…) 把kva里的每项添加到intermediate。这写法有点。。。 tgpl 139


实验任务:

  • 完成mr/master.go, mr/worker.go, and mr/rpc.go 做两个程序,一个master一个worker。实现一个分布式的mapreduce。

  • 一个master进程,多个worker进程。worker通过rpc向master请求任务并执行。


流程解析

  • 得看mapreduce论文或网友解读,不然只看实验要求的话很难猜出它的逻辑。

  • 论文里的那张流程图非常清晰。 程序分为两个过程,先多个worker做map,生成中间文件,再多个worker做reduce,生成数个最终文件。

  1. a个原始文本分为a个map任务,编号X。

  2. master分配map任务给worker,处理分到的文件,得到一串键值对,value都是1,如{‘about’, 1}。

  3. 存入中间文件。怎么存,要用ihash当前单词,再%nReduce得到Y。按XY存到mr-X-Y文件。

  4. master分配reduce任务给worker。此时所有单词的数据是按单词的hash再%nReduce分割在不同Y编号中的。那么对于每个Y,可以把所有X文件里的键值对数量累加一下,存到mr-out-Y。此时每个mr-out-Y文件就存了以hash分割的所有原始数据的单词数据。

  5. test-mr.sh这个测试程序就是把所有mr-out-Y文件里的数据整合起来排序。得到的结果如果与mrsequential.go得到的结果一样就通过测试。


go相关:

  • 怎么好像每运行一次后都要再编译下plugin?

  • rpc: gob error encoding body: gob: type xxxx has no exported fields 定义结构,成员名字首字母要大写,不然rpc这边会报错。

  • 文件Create后马上Open。再写会失败。

  • go的rpc还挺方便。但如果要用别的语言交互,还是得整协议。 文档: https://golang.google.cn/pkg/net/rpc/#Register https://drstearns.github.io/tutorials/gojson/


代码:

没有做的很完善精细,只是跑通测试。仅供参考。

rpc.go

   package mr

   //
   // RPC definitions.
   //
   // remember to capitalize all names.
   //

   import "os"
   import "strconv"

   //
   // example to show how to declare the arguments
   // and reply for an RPC.
   //

   type ExampleArgs struct {
       X int
   }

   type ExampleReply struct {
       ServerStatus int
   }

   // Add your RPC definitions here.

   //type Task struct {
   //  Text string
   //}

   type Task struct {
       TaskType int
       TaskID int
       FileName string
       NReduce int
       NMap int
   }

   type TaskResult struct {
       TaskType int
       Result bool
       TaskID int
       //Data []KeyValue

   }

   // Cook up a unique-ish UNIX-domain socket name
   // in /var/tmp, for the master.
   // Can't use the current directory since
   // Athena AFS doesn't support UNIX-domain sockets.
   func masterSock() string {
       s := "/var/tmp/824-mr-"
       s += strconv.Itoa(os.Getuid())
       return s
   }

master.go

   package mr

   import "log"
   import "net"
   import "os"
   import "net/rpc"
   import "net/http"

   import "fmt"
   //import "io/ioutil"

   type Master struct {
       // Your definitions here.
       map_task_status map[int]int
       map_task_content map[int]string
       map_task_count int
       map_current_task int
       map_done_task_count int

       nReduce int

       reduce_task_status map[int]int
       reduce_task_count int
       reduce_current_task int
       reduce_done_task_count int

       server_status int
   }

   // Your code here -- RPC handlers for the worker to call.

   //
   // an example RPC handler.
   //
   // the RPC argument and reply types are defined in rpc.go.
   //
   func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {

       return nil
   }

   func (m *Master) FetchTask(args *ExampleArgs, task *Task) error {
       fmt.Println("FetchTask")
       //fmt.Println(m.map_current_task)

       //task.Text = m.task_content[m.map_current_task]

       //m.map_current_task += 1

       if m.map_current_task < m.map_task_count { // map 未结束
           task.TaskType = 0
           task.TaskID = m.map_current_task
           task.FileName = m.map_task_content[m.map_current_task]
           task.NReduce = m.nReduce
           task.NMap = m.map_task_count

           m.map_current_task += 1
       } else {
           task.TaskType = 1
           task.TaskID = m.reduce_current_task
           task.NReduce = m.nReduce
           task.NMap = m.map_task_count

           m.reduce_current_task += 1
       }

       return nil
   }

   func (m *Master) ReportTask(result *TaskResult, reply *ExampleReply) error {
       fmt.Println("ReportTask")

       if result.TaskType == 0 { // 简化
           m.map_done_task_count ++
       } else {
           m.reduce_done_task_count ++
       }

       if m.map_done_task_count == m.map_task_count && m.reduce_done_task_count == m.reduce_task_count {
           reply.ServerStatus = 1
           m.server_status = 1
       } else {
           reply.ServerStatus = 0
       }
       
       return nil
   }

   //
   // start a thread that listens for RPCs from worker.go
   //
   func (m *Master) server() {
       rpc.Register(m)
       rpc.HandleHTTP()
       //l, e := net.Listen("tcp", ":1234")
       sockname := masterSock()
       os.Remove(sockname)
       l, e := net.Listen("unix", sockname)
       if e != nil {
           log.Fatal("listen error:", e)
       }
       go http.Serve(l, nil)
   }

   //
   // main/mrmaster.go calls Done() periodically to find out
   // if the entire job has finished.
   //
   func (m *Master) Done() bool {
       ret := false

       // Your code here.
       //if m.map_done_task_count == m.map_task_count {
       //  ret = true
       //}

       if m.server_status == 1{
           ret = true
       } 

       return ret
   }

   //
   // create a Master.
   // main/mrmaster.go calls this function.
   // nReduce is the number of reduce tasks to use.
   //
   func MakeMaster(files []string, nReduce int) *Master {
       m := Master{}

       // Your code here.

       m.server_status = 0

       // 读所有输入文件。每个文件的内容存作为一个task,存到一个
       m.map_task_status = make(map[int]int)
       m.map_task_content = make(map[int]string)
       m.map_task_count = 0
       m.map_current_task = 0
       m.map_done_task_count = 0

       m.nReduce = nReduce

       for _, filename := range files {
           /*
           file, err := os.Open(filename)
           if err != nil {
               log.Fatalf("cannot open %v", filename)
           }
           content, err := ioutil.ReadAll(file)
           if err != nil {
               log.Fatalf("cannot read %v", filename)
           }
           file.Close()*/

           m.map_task_status[m.map_task_count] = 0
           m.map_task_content[m.map_task_count] = string(filename)

           fmt.Println(m.map_task_count)
           fmt.Println(len(string(filename)))
           m.map_task_count += 1
       }

       fmt.Println(m.map_task_content)

       // reduce tasks
       m.reduce_task_status = make(map[int]int)
       m.reduce_task_count = nReduce
       m.reduce_current_task = 0
       m.reduce_done_task_count = 0

       for i := 0; i < nReduce; i++ {
           m.reduce_task_status[i] = 0
       }

       m.server()
       return &m
   }

worker.go

   package mr

   import "fmt"
   import "log"
   import "net/rpc"
   import "hash/fnv"
   import "io/ioutil"
   import "os"
   import "encoding/json"
   import "sort"

   //
   // Map functions return a slice of KeyValue.
   //
   type KeyValue struct {
       Key   string
       Value string
   }

   // for sorting by key.
   type ByKey []KeyValue

   // for sorting by key.
   func (a ByKey) Len() int           { return len(a) }
   func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
   func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

   //
   // use ihash(key) % NReduce to choose the reduce
   // task number for each KeyValue emitted by Map.
   //
   func ihash(key string) int {
       h := fnv.New32a()
       h.Write([]byte(key))
       return int(h.Sum32() & 0x7fffffff)
   }

   //
   // main/mrworker.go calls this function.
   //
   func Worker(mapf func(string, string) []KeyValue,
       reducef func(string, []string) string) {

       // Your worker implementation here.

       // uncomment to send the Example RPC to the master.
       // CallExample()
       //GetTask()

       for true {
           // declare an argument structure.
           args := ExampleArgs{}

           // fill in the argument(s).
           args.X = 99

           // declare a reply structure.
           //reply := ExampleReply{}
           task := Task{}

           // send the RPC request, wait for the reply.
           err := call("Master.FetchTask", &args, &task)

           if !err {
               log.Fatal("rpc error:", err)
               break
           }

           fmt.Printf("task.TaskType %d\n", task.TaskType)

           if task.TaskType == 0 {
               file_name := task.FileName

               file, err := os.Open(file_name)
               if err != nil {
                   log.Fatalf("cannot open %v", file_name)
               }
               content, err := ioutil.ReadAll(file)
               if err != nil {
                   log.Fatalf("cannot read %v", file_name)
               }
               file.Close()


               map_data := make(map[int][]KeyValue)

               kva := mapf("gg", string(content))

               for _, kv := range kva {
                   //fmt.Printf("%s %s\n", kv.Key, kv.Value)
                   hash_id := ihash(kv.Key) % task.NReduce

                   map_data[hash_id] = append(map_data[hash_id], kv)
               }

               // 输出到临时文件 mr-task.TaskID-hash_id
               for hash_id, data  := range map_data {
                   file_name := fmt.Sprintf("mr-%d-%d", task.TaskID, hash_id)

                   file, err := os.Create(file_name)

                   //file, err := os.Open(file_name)
                   if err != nil {
                       log.Fatalf("cannot create %v", file_name)
                   }

                   enc := json.NewEncoder(file)
                   for _, kv := range data {
                       //fmt.Printf("%s %s\n", kv.Key, kv.Value)
                       err := enc.Encode(&kv)
                       if err != nil {
                           fmt.Println(err)
                           log.Fatalf("Encode error")
                       }
                   }

                   file.Close()
               }

               // report
               // declare an argument structure.
               reply := ExampleReply{}

               // declare a reply structure.
               //reply := ExampleReply{}
               task_result := TaskResult{}
               task_result.Result = true
               task_result.TaskType = task.TaskType
               task_result.TaskID = task.TaskID
               //task_result.Data = 

               // send the RPC request, wait for the reply.
               call("Master.ReportTask", &task_result, &reply)

               if reply.ServerStatus == 1 {
                   fmt.Println("server tasks all done")
                   break
               }
           } else {
               // 把所有mr-X-Y整合为mr-out-Y

               //file_names := fmt.Sprintf("mr-*-%d", task.TaskID)

               kva := []KeyValue{}

               //for _, filename := range file_names {
               for t := 0; t < task.NMap; t ++ {
                   filename := fmt.Sprintf("mr-%d-%d", t, task.TaskID)

                   file, err := os.Open(filename)
                   if err != nil {
                       log.Fatalf("cannot open %v", filename)
                   }

                   /*
                   content, err := ioutil.ReadAll(file)
                   if err != nil {
                       log.Fatalf("cannot read %v", filename)
                   }*/

                   dec := json.NewDecoder(file)

                   for {
                       var kv KeyValue
                       if err := dec.Decode(&kv); err != nil {
                         break
                       }
                       kva = append(kva, kv)
                   }

                   file.Close()

                   //kva := mapf(filename, string(content))
                   //intermediate = append(intermediate, kva...)
               }

               sort.Sort(ByKey(kva))

               file_name := fmt.Sprintf("mr-out-%d", task.TaskID)

               file, err := os.Create(file_name)
               if err != nil {
                   log.Fatalf("cannot create %v", file_name)
               }

               i := 0
               for i < len(kva) {
                   j := i + 1
                   for j < len(kva) && kva[j].Key == kva[i].Key {
                       j++
                   }
                   values := []string{}
                   for k := i; k < j; k++ {
                       values = append(values, kva[k].Value)
                   }
                   output := reducef(kva[i].Key, values)

                   // this is the correct format for each line of Reduce output.
                   fmt.Fprintf(file, "%v %v\n", kva[i].Key, output)

                   i = j
               }

               file.Close()

               // report
               // declare an argument structure.
               reply := ExampleReply{}

               // declare a reply structure.
               //reply := ExampleReply{}
               task_result := TaskResult{}
               task_result.Result = true
               task_result.TaskType = task.TaskType
               task_result.TaskID = task.TaskID
               //task_result.Data = 

               // send the RPC request, wait for the reply.
               call("Master.ReportTask", &task_result, &reply)

               if reply.ServerStatus == 1 {
                   fmt.Println("server tasks all done")
                   break
               }
           }
       }
   }

   func GetTask() {

       // declare an argument structure.
       args := ExampleArgs{}

       // fill in the argument(s).
       args.X = 99

       // declare a reply structure.
       //reply := ExampleReply{}
       task := Task{}

       // send the RPC request, wait for the reply.
       call("Master.FetchTask", &args, &task)

       // reply.Y should be 100.
       fmt.Printf("task.TaskType %s\n", task.TaskType)
   }

   func ReportTask() {
   }

   //
   // example function to show how to make an RPC call to the master.
   //
   // the RPC argument and reply types are defined in rpc.go.
   //
   func CallExample() {

       // declare an argument structure.
       args := ExampleArgs{}

       // fill in the argument(s).
       args.X = 99

       // declare a reply structure.
       reply := ExampleReply{}

       // send the RPC request, wait for the reply.
       call("Master.Example", &args, &reply)
   }

   //
   // send an RPC request to the master, wait for the response.
   // usually returns true.
   // returns false if something goes wrong.
   //
   func call(rpcname string, args interface{}, reply interface{}) bool {
       // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
       sockname := masterSock()
       c, err := rpc.DialHTTP("unix", sockname)
       if err != nil {
           log.Fatal("dialing:", err)
       }
       defer c.Close()

       err = c.Call(rpcname, args, reply)
       if err == nil {
           return true
       }

       fmt.Println(err)
       return false
   }