Global Scheduler 设计与实现

现有的调度算法

现有调度算法都是基于node-heartbeats。意思是,每一次做调度策略时只针对当前节点,选择针对当前节点的最合适应用的最合适资源请求,没有应用对集群节点的偏好情况。

调度过程就像如下伪代码所示:

1
2
3
4
5
for nodeHeartBeating:
Go to parentQueue
Go to leafQueue
for application in leafQueue.applications:
for resource­request in application.resource­requests

在伪代码中可以直观的看出,在选择应用直至选择资源请求的过程中,没有关注节点的具体信息,在源代码中可以知道,在选择应用的过程中传递的是clusterResource,只在最后判断节点是否能够支持分配,显然可能会做很多无效的工作。

并且,这样每一次心跳分配一次得到的是次优解,而不是最优解,例如:对一个应用而言,根据心跳分配,类似是依次遍历所有节点的过程,再分配过程中,匹配是否可以在当前节点执行,当可以执行时,该应用就被执行了,但是,此时对APP来说,可能不是最优解的情况,并且可能对locality等限制不能很好的满足,而且效率较低。

Global Schedule

思路

改变针对一个节点进行分配的方式,改用针对一组节点分配,并与节点心跳异步。即按照调度策略根据全局信息,选择应用以及资源请求,然后根据资源请求信息,对节点集进行排序,按照排序的结果遍历节点,找到第一个可以分配的节点,然后进行分配。

细节

PlacementSet

1
2
3
PlacementSet {
Map<NodeId, SchedulerNode> nodeSet;
}
  • nodeSet是Scheduler预先选择的一组节点,用于分配APP请求的资源。默认情况下是集群的所有节点。

NodeScorer

1
2
3
NodeScorer {
Iterator<SchedulerNode> scorePlacementSet(PlacementSet candidates);
}

根据APP请求资源对节点进行打分排序的函数。

ResourceAllocationCommitter

1
2
3
4
5
6
7
8
public interface ResourceAllocationCommitter {
void handle(ResourceAllocationCommitRequest request);
}
public class ResourceAllocationCommitRequest {
List<AllocationProposal> containersToAllocate;
List<ReservationProposal> containersToReserve;
List<ContainerId> containersToRelease
}

ResourceAllocationCommitter是对策略进行审核的接口,该接口有Scheduler实现。
ResourceAllocationCommitRequest是每个线程最终得到的策略结果。

多线程

使用多个线程同时并发调度,共同读取队列结构和节点集信息,确定要分配的节点和资源信息等结果。然后将结果汇报给Commiter(由Scheduler实现),有Commiter进行审核,如果通过则进行分配,如果没有通过,则舍弃该策略。

多线程同时计算策略

问1: 所有线程都是相同的队列结构和节点集信息,不会出现大量重复的结果么?见实现部分

读锁/写锁

在现有的调度算法中,是纯串行化的,也就是调度出串行进行的。而在调度过程中,有很多读操作,比如:检查队列/用户/应用限制、选择队列/应用/资源请求等,完全可以并行化,这样就可以是的多个调度同时进行。确定策略后,才进行写的操作完成策略执行。

问2: 每一个线程都有读操作和写操作,难道不会出现写操作始终等待读操作释放锁么?见实现部分

优化

排序效率问题

可以用以下方法优化:

  1. 使用上述提到的并行化方法优化
  2. 如果APP资源请求为任意节点,则可以随机一个节点进行分配,不在排序
  3. 缓存一些资源请求的排序结果。节点优先顺序可能不会时常变化,或以增量的形式变化。比如:要求hard-node-locality的资源请求,顺序是不会发生改变的。

实现

顶层调度的改动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*
* Handle a ResourceAllocationCommitRequest, can either accept it or reject it
*/
interface ResourceAllocationCommitter {
void handle(ResourceAllocationCommitRequest request);
}

/*
* Scheduler will implement the interface
*/
class Scheduler implements ResourceAllocationCommitter {
// 自顶向下完成策略请求
ResourceAllocationCommitRequest getAllocationProposal(PlacementSet clusterPlacementSet)
readLock {
// Following operations are in read-lock
return rootQueue.getAllocationProposal(clusterPlacementSet);
}
}

// 先自底向上判断是否可以执行该策略,然后自底向上确定策略执行
void tryCommit(ResourceAllocationCommitRequest proposal) {
writeLock {
// Following operations are in write-lock
// Get application for a given proposal
SchedulerApplicationAttempt app = get_application(proposal);
// Check if the proposal will be accepted or not:
boolean accepted = app.accept(proposal);
if (accepted) {
// If proposal is accepted, apply the proposal (update states)
// The reason why we need two separate accept / apply method is:
// We need first check if proposal can be accepted before update
// internal data. Otherwise we need revert changes if proposal is
// rejected by upper level.
app.apply(proposal);
} else {
// Otherwise, discard the proposal
}
}
}

// We can have multiple such thread running at the same time
Thread allocationThread = new Thread() {
void run() {
while (true) {
ResourceAllocationCommitRequest proposal =
// Pass down cluster-placement-set, which is essentially a set of all
// the available nodes in the cluster
scheduler.getAllocationProposal(get_available_placement_set());
scheduler.tryCommit(proposal);
}
}
}
}

在上述代码中,getAllocationProposal过程都为读锁,tryCommit都为写锁,难道不会出现前面所提到的问题2么?所用到的资源情况都相同,不会出现问题1么?

Application & Queue Level Changes For GetAllocationProposal

和现有代码大致相似,不过使用了读锁机制

ParentQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ParentQueue {
ResourceAllocationCommitRequest getAllocationProposal(PlacementSet clusterPlacementSet) {
readLock {
// All allocations are under read-lock
if (!exceed_queeu_max_limit()) {
return NOTHING_ALLOCATED;
}
for (queue in sort(childQueues)) {
ResourceAllocationCommitRequest proposal = queue.getAllocationProposal(cluste
if (proposal != NOTHING_ALLOCATED) {
return proposal;
}
}
}
}
}

LeafQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LeafQueue {
ResourceAllocationCommitRequest getAllocationProposal(PlacementSet clusterPlacementSet) {
readLock {
// All allocations are under read-lock
if (!exceed_queeu_max_limit()) {
return NOTHING_ALLOCATED;
}
for (application in sort(applications)) {
if (!exceed_user_limit(application.get_user())) {
continue;
}
ResourceAllocationCommitRequest proposal = application.getAllocationProposal
if (proposal != NOTHING_ALLOCATED) {
return proposal;
}
}
}
}
}

Application

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class SchedulerApplicationAttempt {
ResourceAllocationCommitRequest getAllocationProposal(PlacementSet clusterPlacementSet) {
readLock {
// All allocations are under read-lock
for (request in sort(resource_requests)) {
// Given request and placement set, find best node to allocate
// Filter clusterPlacementSet by given resource request, for example:
// - Hard locality
// - Anti-affinity / Affinity
PlacementSet filteredPlacementSet = filter(clusterPlacementSet, request);
// Sort filteredPlacement according to resouce-request
for (node in sort(filteredPlacementSet, request)) {
if (node.has_enough_available_resource()) {
// If node has enough available resource to allocate this request
// Return a proposal for allocate this container
} else {
// If node doesn't have enough available resource
// Return a proposal for reserve the container
}
// Also, what could happen:
// - Container released, for example, unnecessary reserved container
// - Cannot find node, return NOTHING_ALLOCATED
}
}
}
}
}

Application & Queue Level Changes For Aceept / Apply Reosurce

Allocation Proposal
自底向上进行审核和确认

Application

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SchedulerApplicationAttempt {
boolean accept(ResourceAllocationCommitRequest proposal) {
readLock {
// Check following:
// - Does node still have available resource to allocate container
// - Is node reserved by other application
// - Is the application still need the resource
if (canAccept(proposal) && parent.accept(proposal)) {
return true;
} else {
return false;
}
}
}
void apply(ResourceAllocationCommitRequest proposal) {
writeLock {
// Update following:
// - Deduct pending resource
// - Update live containers map
// - Update metrics, etc.
}
parent.apply(proposal);
}
}

Leaf/ParentQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class LeafQueue/ParentQueue {
boolean accept(ResourceAllocationCommitRequest proposal) {
readLock {
// Check following:
// - queue-limit / user-limit
if (canAccept(proposal) && parent.accept(proposal)) {
return true;
} else {
return false;
}
}
}
void apply(ResourceAllocationCommitRequest proposal) {
writeLock {
// Update following:
// - Update resource usages for queue / user
}
parent.apply(proposal);
}
}

对比

现有算法是先确定Node然后确定APP;而Global是先确定APP的Resource Request,然后根据Request确定Node的顺序,保证了APP执行策略的最优。

Global的这一思路和CloudSim默认算法有相像。在CloudSim中,调度策略是拿着Cloudlet(即Yarn中的APP)寻找合适的VM(即Yarn中的Node)。

参考

  • YARN-5139,本文大部分为该issue的意译以及个人理解。