-
Notifications
You must be signed in to change notification settings - Fork 1
/
systemcall.scala
150 lines (122 loc) · 5.3 KB
/
systemcall.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//on the driver
def main(){
val sc=new SparkContext()
val rdd = sc.makeRDD()
val finalRDD = rdd.transformation()
val result = finalRDD.action()
}
//In the final action (finalRDD.action)
//Generate job stages and tasks
sc.runjob()
dagScheduler.runJob()
dagScheduler.submitJob()
dagSchedulerEventProcessActor ! Jobsubmitted
dagScheduler.handle.JobSubmitted()
finalStage = newStage()
mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
dagScheduler.submitStage()
missingStages = dagScheduler.getMissingParentStages()
dagScheduler.subMissingTasks(readyStage)
//add tasks to the taskScheduler
taskScheduler.submitTasks(new TaskSet(tasks))
fifoSchedulableBuilder.addTaskSetManager(taskSet)
//send tasks
sparkDeploySchedulerBackend.reviveOffers()
driverActor ! ReviveOffers
sparkDeploySchedulerBackend.makeOffers()
sparkDeploySchedulerBackend.launchTasks()
foreach task:
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)
//On the worker node:
coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId,serializedTask))
In TaskRunner.run()
=> CoarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=>value = task.run(taskId)
=>directResult = new directTaskResult(ser.serialize(value))
=> if (directResult.size()>akkaFrameSize())
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
else
return directResult
=> coarseGrainedExexcutorBackend.statusUpdate(result)
=> dirver ! StatusUpdate(executorId,taskId,result)
//map & reduce tasks on the slave nodes
In task.run(taskId)
//if the task is ShuffleMapTask
=> shuffleMaptTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId,partitionId,numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split,context))
=> return MapStatus(blockManager.blockMangerId,Array[compressedSize(fileSegment)])
//if the task is ResultTask
=> return func(context,rdd.iterator(split,context))
//Aftere drvier receivers StatusUpdate(result)
driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId,State,result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is indirectResult
serializedTaskResult = blockManager.ggetRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid,result)
=> taskSetManager.handleSuccessfulTask(tid,taskResult)
=> dagScheduler.taskEnded(result.value,result.accumUpdates)
=> dagSchedulerEventProcessActor ! CompletionEvnet(result,accumUpdates)
=> dageScheduler.handleTaskCompletion(completion)
=> Accumulator.add(event.accumUpdates)
//If the finished task is ResultTask
=> if (job.numFinished == job.numPartitions)
listenerBus.post(SparkListenerJobEnd(job.jobId,JobSucceeded))
=> job.listener.taskSucceeded(outputId,result)
=> jobWaiter.taskSucceeded(index,result)
resultHandler(index,result)
//if the finished task is SHuffleMapTask()
=> stage.addOutputLoc(smt.partitionId,status)
=> if (all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId,Array[MapStatus])
mapStatuses.put(shuffleId,Array[MapStatus]()++statuses)
=> submitStage(stage)
//shuffle read
rdd.iterator()
=> rdd(e.g. ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffleedId,split.index,context,ser)
=>blockStoreSHuffleFetcher.fetch(shuffleId,reduceId,context,serializer)
=>statuses = MapOutputTrackerWorker.getServersStatuses(shuffleId, reduceId)
=> blocksByAddress: Seq[(BlockManagerId,Seq[(BlockId,Long)])] = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress,serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)
In basicBlockFetcherIterator:
//generate the fetch requests
=> basicBlockFetcherIterator.initialize()
=> remoteRequests = splitLocalRemoteBlocks()
=> fetchRequests ++= Utils.randomize(remoteRequests)
//fetch remote block
=> sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight
=> blockManger.connectionManager.sendMessageReliably(cmId,blockMessageArray,toBufferMessage)
=> fetchResults.put(new FetchResult(blockId,sizeMap(blockId)))
=> dataDeserialize(blockId, blockMessage.getDat, serializer)
//fetch local block
=> getLocalBlocks()
=> fetchResults.put(new FetchResult(id, 0, ()) => iter))
// on the mapper side of shuffle read
After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> handleMessage(connectionManagerId, message, connection)
// invoke blockManagerWorker to read the block (FileSegment)
=> blockManagerWorker.onBlockMessageReceive()
=> blockManagerWorker.processBlockMessage(blockMessage)
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> shuffleManager.getBlockLocation()
=> if(fileSegment < minMemoryMapBytes)
buffer = ByteBuffer.allocate(fileSegment)
else
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
// on the reducer side of shuffle read
BasicBlockFetcherIterator.next()
=> result = results.task()
=> while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
=> result.deserialize()