Reading

Paper: MapReduce

map and reduce are just what you would think they are, as in JavaScript 😊. You write functional code and the run-time system takes care of the details of:

  • partitioning the input data
  • scheduling the program’s execution across a set of machines
  • handling machine failures
    • achieve availability and reliability of storage on machines’ local disks via replication
    • achieve fault tolerance via re-execution: take advantage of the functional model of map and reduce operations
  • managing the required inter-machine communication

How distribution is achieved

  • The Map invocations are distributed by partitioning input data into a set of M splits
  • The Reduce invocations are distributed by y partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and partitioning function are specified by the user.

Staggler issue

As a map reduce execution gets to an end, the last few tasks might take very long for reasons like other tasks being scheduled on the machine. This issue is alleviated by back-up mechanism: the master will schedule back-up executions for the last few in-progress tasks, and the task will be considered complete whether the primary or the back-up execution is completed.

Use case: large-scale production indexing

Google migrated the document indexing workload to MapReduce. Benefits in the outcome:

  • Alterability - separation of concerns
  • Acceptable performance and therefore no need to mix conceptually unrelated computations together to avoid extra passes over the data.
  • Smaller code base - 3800 lines of C++ to 700

Restricted programming models

e.g. An associative function can be computed over all prefixes of an N element array in log N time on N processors using parallel prefix computations.

Advantage of MapReduce over these restricted models: Fault-tolerant that scales to up to thousands of processes v.s. small scale and programmers take care of machine failure.

Lecture

Why distributed system?

  • Paralism
  • Fault Tolerance
  • Physical: naturally distributed
  • Security/ Isolation

Challenges

  • Concurrency
  • Partial failure
  • Performance (achieving the performance you think you deserve): Original reason is for higher performance via paralism, but obtaining the 100* speed with 100* machines is hard.

Assignment

Task: Implement the “master”/“cordinator” and “worker” process for MapReduce.

Interesting observations from the starter code

build in --plugin mode, find Map and Reduce functions via loadPlugin

Nice to know that there is a plugin go package taking care of the loading and symbol resolution of Go plugins.

The timeout Linux command

timeout is a command-line utility that runs a specified command and terminates it if it is still running after a given period of time.

Get the Mac alternative:

1
brew install coreutils

Implementation

Worker’s steps

Step 1: Request a job from the coordinator
Step 2: Open the file and read in the content
Step 3: Do the Map or the Reduce job
Step 4: Write result into file
Step 5: Send output file name back to coordinator

Coordinator’s RPC handlers

Handler 1: HandleJobRequest: Find an indle task and put the information (job type, filename) into the response.
Handler 2: HandleJobFinishReport: Update the task state. Decide if all Map tasks have finished; decide if all Reduce tasks have finished and update the Done return value.

Questions asked

Question 1: Can a worker do either map or reduce?

Yes. It is determined by the reply from the job request.

Question 2: How is nReduce and ihash used by Worker?

According to the MapReduce paper:

Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g.,hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user

  • R: nReduce
  • Intermediate key space: the collection of all keys produced by map
  • hash function: ihash
  • who’s responsible for the distribution: When the map functions write their results

Question 3: Who is responsible for the distribution of the reduce invocations?

The worker, when it has invoked the mapf function and writes the outcomes to the intermediate files.
It should decide which file to write the intermediate key-value pairs according to the result of ihash.

Question 4: How does the Coordinator decide which map job to assign to a worker?

According to the MapReduce paper:

The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits.

According to the implementations of Map in the mrapps, they are passed two arguments filename string and content string. It might not always use these two arguments - for example, the word count app only uses the content. Therefore, I think I can assume that each map invocation will only involve one file, and the Worker will be responsible for opening the file and reading the content before invoking the Map function.

Question 5: How do I manage the jobs in the Coordinator?

  • []Job: hard to query by file name
  • map[string]Job with filename as the key: hard to find incomplete jobs, not possible for reduce jobs as there are up to mMap intermediate files to be processed by each task.
  • map[int]Job with job number as the key - this is what I did for the first iteration. The Coordinator maintains two such maps, reduceJobs and mapJobs. - there turned out to be a better solution, more on this later.

Performance issue

Now that I have the first iteration running (Yes! 🎉), the performance issue becomes notable. Yihao got his version finishing one invocation of test-mr.sh within 1 minutes while I could finish a song waiting for it. Here are some design decisions that I noted down after some discussion with him:

Rethinking Question 5 - Coordinator’s management of the jobs

Currently, my Coordinator needs to loop through the whole map of jobs to find an available job or to conclude that there’s no suitable job to assign. I originally designed it this way because I was thinking of the “resetting” of jobs back to idle state and thought it would be hard to keep a collection of idle jobs. However, it was actually not that bad. All I had to do was to maintain a slice/queue of jobs to be assigned, and if the job did not get finished in time, I can still append it back. On the other end, the already scheduled jobs could be maintained in a map with keys of job numbers. This makes the querying of scheduled job O(1) if the job gets finished/ did not finish in time.

Rethinking writing to files: parallel writing

The map tasks need to write into intermediate files, and the reduce tasks need to write to output files.
Currently I maintain a map of json encoders, and loop through each key-value pair, and write into the right encoders. This should be largely optimized by dividing the content in-memory and then writing into each file in parallel.

Rethinking reading from files: use glob

The reduce tasks need to read from intermediate files formatted mr-int-X-Y where Y is the current reduce task number and X is number of any the map tasks that writes into it. Before I looped through all map task numbers and read their content. Two problems: performance, and the map job might have not produced kv for this specific reduce task therefore unnecessary error handling.

Using glob and write all files in via glob formatted file name would be more robust - as long as there’s no side effect of mapf and reducef producing some other files that follow the same format :)

Don’t communicate by sharing memory; share memory by communicate

Use a channel to manage unscheduled jobs. It has these advantage:

  • It is blocking. The rpc method caller will just be blocked instead of receiving an idle response.
  • It is thread safe. No need to apply lock on the slice of job, or each single job.

Use channel to manage completed jobs

I really like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go func() {
for i := 0; i < nMap; i++ {
<-c.jobsCompleted
}
c.mu.Lock()
c.state = doingReduce
c.mu.Unlock()
go c.startReduceJobScheduling()
for i := 0; i < nReduce; i++ {
<-c.jobsCompleted
}
c.mu.Lock()
c.state = done
c.mu.Unlock()
}()

Now the last ambitious goal: remove all locks

Same philosophy: I am using locks to control access to a piece of memory shared by multiple go routines - i.e. communicate by sharing memory.
If I could allow the go routines to communicate with a data access go routine via a channel, it is then natively thread safe.

What I needed to do is to have a long-running go routine to monitor channels and handle requests from these channels to manipulate map data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *Coordinator) manageScheduledJobsMap() {
for {
select {
case j := <-c.jobsToComplete:
_, going := c.jobsInProgress[*j]
if going {
delete(c.jobsInProgress, *j)
c.jobsCompleted <- j
}
case j := <-c.jobsToReschedule:
_, going := c.jobsInProgress[*j]
if going {
delete(c.jobsInProgress, *j)
c.jobsToSchedule <- j
}
case j := <-c.jobsToRecordScheduled:
c.jobsInProgress[*j] = true
default:
continue
}
}
}