aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-05-09 18:01:23 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-05-09 18:01:23 -0400
commitf42050c91ede1ea63a23eb2e0489933023ee8f1c (patch)
tree5c297879049ca91733010b06644144659c189127
parent694855d7f5c1abd4509fa977618dfc75e8899470 (diff)
tighten engine blocking terms for few-task case
-rw-r--r--rtss14/gpusync.py73
1 files changed, 54 insertions, 19 deletions
diff --git a/rtss14/gpusync.py b/rtss14/gpusync.py
index 3f6242a..1a67e16 100644
--- a/rtss14/gpusync.py
+++ b/rtss14/gpusync.py
@@ -7,7 +7,7 @@ import sets
7import itertools 7import itertools
8import numpy as np 8import numpy as np
9 9
10from math import ceil 10from math import ceil, floor
11from copy import deepcopy 11from copy import deepcopy
12from collections import defaultdict 12from collections import defaultdict
13 13
@@ -95,7 +95,9 @@ class FifoExecEngineLock(FifoEngineLock):
95 sz = sum([len(o) for o in others]) 95 sz = sum([len(o) for o in others])
96 # collapse into a single list 96 # collapse into a single list
97 iters = itertools.chain.from_iterable(others) 97 iters = itertools.chain.from_iterable(others)
98 take = (self.ntokens-1)*nrequests 98
99 terms = min(self.ntokens-1, len(self.users)-1)
100 take = terms*nrequests
99 b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) 101 b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take))
100 return b 102 return b
101 103
@@ -112,7 +114,8 @@ class FifoCopyEngineLockNoMigrate(FifoEngineLock):
112 for u in self.users if t is not u and 0 in u.resmodel[self].requests] 114 for u in self.users if t is not u and 0 in u.resmodel[self].requests]
113 sz = sum([len(o) for o in others]) 115 sz = sum([len(o) for o in others])
114 iters = itertools.chain.from_iterable(others) 116 iters = itertools.chain.from_iterable(others)
115 take = (self.ntokens-1)*nrequests 117 terms = min(self.ntokens-1, len(self.users)-1)
118 take = terms*nrequests
116 b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) 119 b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take))
117 return b 120 return b
118 121
@@ -153,11 +156,16 @@ class FifoCopyEngineLockP2P(FifoEngineLock):
153 156
154 assert self.alltokens >= 2 157 assert self.alltokens >= 2
155 158
159 normal_terms = min(self.ntokens - 1, len(self.users) - 1)
160 trans_terms = min(self.alltokens - 1, len(self.users) - 1)
161
156 # hard: take (rho*g-2 normal) and 1 migration, per nhard request 162 # hard: take (rho*g-2 normal) and 1 migration, per nhard request
157 # easy: take rho-1 normal, per neasy request 163 nnormal = (trans_terms - 1) * nhard
158 nnormal = (self.alltokens - 2) * nhard + (self.ntokens - 1) * neasy
159 nmigs = nhard 164 nmigs = nhard
160 165
166 # easy: take rho-1 normal, per neasy request
167 nnormal += normal_terms * neasy
168
161 if len(normal) < nnorm: 169 if len(normal) < nnorm:
162 # not enough normal requests. take underflow from migrations 170 # not enough normal requests. take underflow from migrations
163 nmigs += nnorm - len(normal) 171 nmigs += nnorm - len(normal)
@@ -214,7 +222,8 @@ class FifoCopyEngineLockP2P(FifoEngineLock):
214 assert len(migs) == 0 222 assert len(migs) == 0
215 assert nhard == 0 223 assert nhard == 0
216 224
217 ntake = (self.ntokens - 1) * n 225 terms = min(self.ntokens - 1, len(self.users) - 1)
226 ntake = terms * n
218 sz = sum([len(l) for l in _local]) 227 sz = sum([len(l) for l in _local])
219 local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sz) 228 local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sz)
220 b = np.sum(nmax(local, ntake)) 229 b = np.sum(nmax(local, ntake))
@@ -238,9 +247,21 @@ class FifoCopyEngineLockP2P(FifoEngineLock):
238 normal_from_cache = True 247 normal_from_cache = True
239 normal = self.normal_cache[t] 248 normal = self.normal_cache[t]
240 249
241 nnorm = (self.alltokens - 2)*nhard + (self.ntokens - 1) * neasy 250
251 assert self.alltokens >= 2
252
253 # FIFO queueing. blocked by at most one request from each unique other task
254 # take len(tasks)-1 terms instead of rho-1 / rho*g - 1 tasks if that is smaller.
255 normal_terms = min(self.ntokens - 1, len(self.users) - 1)
256 trans_terms = min(self.alltokens - 1, len(self.users) - 1)
257
258 # hard: take (rho*g-2 normal) and 1 migration, per nhard request
259 nnorm = (trans_terms - 1) * nhard
242 nmigs = nhard 260 nmigs = nhard
243 261
262 # easy: take rho-1 normal, per neasy request
263 nnorm += normal_terms * neasy
264
244 if len(normal) < nnorm: 265 if len(normal) < nnorm:
245 # not enough normal requests. take underflow from migrations 266 # not enough normal requests. take underflow from migrations
246 nmigs += nnorm - len(normal) 267 nmigs += nnorm - len(normal)
@@ -300,17 +321,20 @@ class FifoCopyEngineLockAlternateP2P(FifoEngineLock):
300 # every migration request is blocked by: 321 # every migration request is blocked by:
301 # (rho*g-1) normal requests and (rho*g-1) migration requests 322 # (rho*g-1) normal requests and (rho*g-1) migration requests
302 323
324 normal_terms = min(self.ntokens - 1, len(self.users) - 1)
325 trans_terms = min(self.alltokens - 1, len(self.users) - 1)
326
303 # compute blocking for normal requests, adjusting for overflow 327 # compute blocking for normal requests, adjusting for overflow
304 nnorm = min((self.ntokens-1)*nrequests, len(normal)) 328 nnorm = min(normal_terms*nrequests, len(normal))
305 nmig = min(min((self.ntokens-1)*nrequests, len(migs)), nnorm) 329 nmig = min(min(normal_terms*nrequests, len(migs)), nnorm)
306 330
307 # number of interfering requests remaining in each category 331 # number of interfering requests remaining in each category
308 nremain = max(0, len(normal) - nnorm) 332 nremain = max(0, len(normal) - nnorm)
309 nmigremain = max(0, len(migs) - nmig) 333 nmigremain = max(0, len(migs) - nmig)
310 334
311 # compute blocking for migration requests 335 # compute blocking for migration requests
312 nmig_extra = min((self.alltokens-1)*nmigrequests, nmigremain) 336 nmig_extra = min(trans_terms*nmigrequests, nmigremain)
313 nnorm_extra = min(min((self.alltokens-1)*nmigrequests, nmig_extra), nremain) 337 nnorm_extra = min(min(trans_terms*nmigrequests, nmig_extra), nremain)
314 338
315 # accumulate total blocking 339 # accumulate total blocking
316 nnorm += nnorm_extra 340 nnorm += nnorm_extra
@@ -354,17 +378,20 @@ class FifoCopyEngineLockAlternateP2P(FifoEngineLock):
354 local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sum([len(n) for n in _local])) 378 local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sum([len(n) for n in _local]))
355 remote = np.fromiter(itertools.chain.from_iterable(_remote), np.float32, count=sum([len(n) for n in _remote])) 379 remote = np.fromiter(itertools.chain.from_iterable(_remote), np.float32, count=sum([len(n) for n in _remote]))
356 380
381 normal_terms = min(self.ntokens - 1, len(self.users) - 1)
382 trans_terms = min(self.alltokens - 1, len(self.users) - 1)
383
357 # compute blocking for local requests, adjusting for overflow 384 # compute blocking for local requests, adjusting for overflow
358 nlocal = min((self.ntokens-1)*nrequests, len(local)) 385 nlocal = min(normal_terms*nrequests, len(local))
359 nmig = min(min((self.ntokens-1)*nrequests, len(migs)), nlocal) 386 nmig = min(min(normal_terms*nrequests, len(migs)), nlocal)
360 387
361 # number of interfering requests remaining in each category 388 # number of interfering requests remaining in each category
362 nremain = max(0, len(local) + len(remote) - nlocal) 389 nremain = max(0, len(local) + len(remote) - nlocal)
363 nmigremain = max(0, len(migs) - nmig) 390 nmigremain = max(0, len(migs) - nmig)
364 391
365 # compute blocking for migration requests 392 # compute blocking for migration requests
366 nmig_extra = min((self.alltokens-1)*nmigrequests, nmigremain) 393 nmig_extra = min(trans_terms*nmigrequests, nmigremain)
367 nnorm_extra = min(min((self.alltokens-1)*nmigrequests, nmig_extra), nremain) 394 nnorm_extra = min(min(trans_terms*nmigrequests, nmig_extra), nremain)
368 395
369 nmig += nmig_extra 396 nmig += nmig_extra
370 397
@@ -431,11 +458,18 @@ class KfmlpTokenLock(TokenLock):
431class R2dglpTokenLock(KfmlpTokenLock): 458class R2dglpTokenLock(KfmlpTokenLock):
432 def __init__(self, tasks, ncpu, ngpu, ntokens): 459 def __init__(self, tasks, ncpu, ngpu, ntokens):
433 super(R2dglpTokenLock,self).__init__(tasks, ngpu, ntokens) 460 super(R2dglpTokenLock,self).__init__(tasks, ngpu, ntokens)
434 fifo_len = int(ceil(float(ncpu)/self.k)) 461 if self.k > 0:
435 self.nterms = 2*fifo_len - 1 462 fifo_len = int(ceil(float(ncpu)/self.k))
436 self.fifo_capacity = fifo_len * self.k 463 self.nterms = 2*fifo_len - 1
464 self.fifo_capacity = fifo_len * self.k
465 else:
466 self.nterms = None
467 self.fifo_capacity = None
437 468
438 def blocking(self, t): 469 def blocking(self, t):
470 assert self.nterms is not None
471 assert self.fifo_capacity is not None
472
439 if len(self.users) <= self.fifo_capacity: 473 if len(self.users) <= self.fifo_capacity:
440 # Few enough users that R2DGLP operates like k-FMLP 474 # Few enough users that R2DGLP operates like k-FMLP
441 return super(R2dglpTokenLock,self).blocking(t) 475 return super(R2dglpTokenLock,self).blocking(t)
@@ -457,7 +491,7 @@ class CkomlpTokenLock(TokenLock):
457 super(CkomlpTokenLock,self).__init__(tasks, ngpu, ntokens) 491 super(CkomlpTokenLock,self).__init__(tasks, ngpu, ntokens)
458 self.k = ngpu*ntokens 492 self.k = ngpu*ntokens
459 self.ncpu = ncpu 493 self.ncpu = ncpu
460 self.nterms = int(ceil(float(ncpu)/self.k)) - 1 494 self.nterms = int(ceil(float(ncpu)/self.k)) - 1 if self.k > 0 else None
461 self.other_locks = sets.Set() 495 self.other_locks = sets.Set()
462 # array of unique cluster ids 496 # array of unique cluster ids
463 self.clusters = set([u.partition for u in self.users]) 497 self.clusters = set([u.partition for u in self.users])
@@ -466,6 +500,7 @@ class CkomlpTokenLock(TokenLock):
466 500
467 def request_blocking(self, t): 501 def request_blocking(self, t):
468 assert t in self.users 502 assert t in self.users
503 assert self.nterms is not None
469 504
470 # return previously computed blocking term if it exists 505 # return previously computed blocking term if it exists
471 if t in self.cache: 506 if t in self.cache: