Reading
Paper: MapReduce
map
and reduce
are just what you would think they are, as in JavaScript
😊. You write functional code and the run-time system takes care of the details of:
- partitioning the input data
- scheduling the program’s execution across a set of machines
- handling machine failures
- achieve availability and reliability of storage on machines’ local disks via replication
- achieve fault tolerance via re-execution: take advantage of the functional model of map and reduce operations
- managing the required inter-machine communication
How distribution is achieved
- The
Map
invocations are distributed by partitioning input data into a set of M splits - The
Reduce
invocations are distributed by y partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and partitioning function are specified by the user.
Staggler issue
As a map reduce execution gets to an end, the last few tasks might take very long for reasons like other tasks being scheduled on the machine. This issue is alleviated by back-up mechanism: the master will schedule back-up executions for the last few in-progress tasks, and the task will be considered complete whether the primary or the back-up execution is completed.
Use case: large-scale production indexing
Google migrated the document indexing workload to MapReduce. Benefits in the outcome:
- Alterability - separation of concerns
- Acceptable performance and therefore no need to mix conceptually unrelated computations together to avoid extra passes over the data.
- Smaller code base - 3800 lines of C++ to 700
Related work
Restricted programming models
e.g. An associative function can be computed over all prefixes of an N element array in log N time on N processors using parallel prefix computations.
Advantage of MapReduce over these restricted models: Fault-tolerant that scales to up to thousands of processes v.s. small scale and programmers take care of machine failure.
Lecture
Why distributed system?
- Paralism
- Fault Tolerance
- Physical: naturally distributed
- Security/ Isolation
Challenges
- Concurrency
- Partial failure
- Performance (achieving the performance you think you deserve): Original reason is for higher performance via paralism, but obtaining the 100* speed with 100* machines is hard.
Assignment
Task: Implement the “master”/“cordinator” and “worker” process for MapReduce.
Interesting observations from the starter code
build in --plugin
mode, find Map
and Reduce
functions via loadPlugin
Nice to know that there is a plugin
go package taking care of the loading and symbol resolution of Go plugins.
The timeout
Linux command
timeout is a command-line utility that runs a specified command and terminates it if it is still running after a given period of time.
Get the Mac alternative:
1 | brew install coreutils |
Implementation
Worker’s steps
Step 1: Request a job from the coordinator
Step 2: Open the file and read in the content
Step 3: Do the Map or the Reduce job
Step 4: Write result into file
Step 5: Send output file name back to coordinator
Coordinator’s RPC handlers
Handler 1: HandleJobRequest
: Find an indle task and put the information (job type, filename) into the response.
Handler 2: HandleJobFinishReport
: Update the task state. Decide if all Map
tasks have finished; decide if all Reduce
tasks have finished and update the Done
return value.
Questions asked
Question 1: Can a worker do either map or reduce?
Yes. It is determined by the reply from the job request.
Question 2: How is nReduce
and ihash
used by Worker?
According to the MapReduce paper:
Reduce invocations are distributed by partitioning the intermediate key space into
R
pieces using a partitioning function (e.g.,hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user
- R: nReduce
- Intermediate key space: the collection of all keys produced by map
- hash function: ihash
- who’s responsible for the distribution: When the map functions write their results
Question 3: Who is responsible for the distribution of the reduce invocations?
The worker, when it has invoked the mapf
function and writes the outcomes to the intermediate files.
It should decide which file to write the intermediate key-value pairs according to the result of ihash
.
Question 4: How does the Coordinator decide which map job to assign to a worker?
According to the MapReduce paper:
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits.
According to the implementations of Map
in the mrapps
, they are passed two arguments filename string
and content string
. It might not always use these two arguments - for example, the word count app only uses the content
. Therefore, I think I can assume that each map invocation will only involve one file, and the Worker
will be responsible for opening the file and reading the content before invoking the Map
function.
Question 5: How do I manage the jobs in the Coordinator?
[]Job
: hard to query by file namemap[string]Job
with filename as the key: hard to find incomplete jobs, not possible for reduce jobs as there are up tomMap
intermediate files to be processed by each task.map[int]Job
with job number as the key - this is what I did for the first iteration. The Coordinator maintains two such maps,reduceJobs
andmapJobs
. - there turned out to be a better solution, more on this later.
Performance issue
Now that I have the first iteration running (Yes! 🎉), the performance issue becomes notable. Yihao got his version finishing one invocation of test-mr.sh
within 1 minutes while I could finish a song waiting for it. Here are some design decisions that I noted down after some discussion with him:
Rethinking Question 5 - Coordinator’s management of the jobs
Currently, my Coordinator needs to loop through the whole map of jobs to find an available job or to conclude that there’s no suitable job to assign. I originally designed it this way because I was thinking of the “resetting” of jobs back to idle state and thought it would be hard to keep a collection of idle jobs. However, it was actually not that bad. All I had to do was to maintain a slice/queue of jobs to be assigned, and if the job did not get finished in time, I can still append it back. On the other end, the already scheduled jobs could be maintained in a map with keys of job numbers. This makes the querying of scheduled job O(1) if the job gets finished/ did not finish in time.
Rethinking writing to files: parallel writing
The map tasks need to write into intermediate files, and the reduce tasks need to write to output files.
Currently I maintain a map of json encoders, and loop through each key-value pair, and write into the right encoders. This should be largely optimized by dividing the content in-memory and then writing into each file in parallel.
Rethinking reading from files: use glob
The reduce tasks need to read from intermediate files formatted mr-int-X-Y
where Y
is the current reduce task number and X
is number of any the map tasks that writes into it. Before I looped through all map task numbers and read their content. Two problems: performance, and the map job might have not produced kv for this specific reduce task therefore unnecessary error handling.
Using glob and write all files in via glob formatted file name would be more robust - as long as there’s no side effect of mapf
and reducef
producing some other files that follow the same format :)
Don’t communicate by sharing memory; share memory by communicate
Use a channel to manage unscheduled jobs. It has these advantage:
- It is blocking. The rpc method caller will just be blocked instead of receiving an idle response.
- It is thread safe. No need to apply lock on the slice of job, or each single job.
Use channel to manage completed jobs
I really like this
1 | go func() { |
Now the last ambitious goal: remove all locks
Same philosophy: I am using locks to control access to a piece of memory shared by multiple go routines - i.e. communicate by sharing memory.
If I could allow the go routines to communicate with a data access go routine via a channel, it is then natively thread safe.
What I needed to do is to have a long-running go routine to monitor channels and handle requests from these channels to manipulate map data:
1 | func (c *Coordinator) manageScheduledJobsMap() { |