diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-05-09 18:01:23 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-05-09 18:01:23 -0400 |
commit | f42050c91ede1ea63a23eb2e0489933023ee8f1c (patch) | |
tree | 5c297879049ca91733010b06644144659c189127 | |
parent | 694855d7f5c1abd4509fa977618dfc75e8899470 (diff) |
tighten engine blocking terms for few-task case
-rw-r--r-- | rtss14/gpusync.py | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/rtss14/gpusync.py b/rtss14/gpusync.py index 3f6242a..1a67e16 100644 --- a/rtss14/gpusync.py +++ b/rtss14/gpusync.py | |||
@@ -7,7 +7,7 @@ import sets | |||
7 | import itertools | 7 | import itertools |
8 | import numpy as np | 8 | import numpy as np |
9 | 9 | ||
10 | from math import ceil | 10 | from math import ceil, floor |
11 | from copy import deepcopy | 11 | from copy import deepcopy |
12 | from collections import defaultdict | 12 | from collections import defaultdict |
13 | 13 | ||
@@ -95,7 +95,9 @@ class FifoExecEngineLock(FifoEngineLock): | |||
95 | sz = sum([len(o) for o in others]) | 95 | sz = sum([len(o) for o in others]) |
96 | # collapse into a single list | 96 | # collapse into a single list |
97 | iters = itertools.chain.from_iterable(others) | 97 | iters = itertools.chain.from_iterable(others) |
98 | take = (self.ntokens-1)*nrequests | 98 | |
99 | terms = min(self.ntokens-1, len(self.users)-1) | ||
100 | take = terms*nrequests | ||
99 | b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) | 101 | b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) |
100 | return b | 102 | return b |
101 | 103 | ||
@@ -112,7 +114,8 @@ class FifoCopyEngineLockNoMigrate(FifoEngineLock): | |||
112 | for u in self.users if t is not u and 0 in u.resmodel[self].requests] | 114 | for u in self.users if t is not u and 0 in u.resmodel[self].requests] |
113 | sz = sum([len(o) for o in others]) | 115 | sz = sum([len(o) for o in others]) |
114 | iters = itertools.chain.from_iterable(others) | 116 | iters = itertools.chain.from_iterable(others) |
115 | take = (self.ntokens-1)*nrequests | 117 | terms = min(self.ntokens-1, len(self.users)-1) |
118 | take = terms*nrequests | ||
116 | b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) | 119 | b = np.sum(nmax(np.fromiter(iters, np.float32, count=sz), take)) |
117 | return b | 120 | return b |
118 | 121 | ||
@@ -153,11 +156,16 @@ class FifoCopyEngineLockP2P(FifoEngineLock): | |||
153 | 156 | ||
154 | assert self.alltokens >= 2 | 157 | assert self.alltokens >= 2 |
155 | 158 | ||
159 | normal_terms = min(self.ntokens - 1, len(self.users) - 1) | ||
160 | trans_terms = min(self.alltokens - 1, len(self.users) - 1) | ||
161 | |||
156 | # hard: take (rho*g-2 normal) and 1 migration, per nhard request | 162 | # hard: take (rho*g-2 normal) and 1 migration, per nhard request |
157 | # easy: take rho-1 normal, per neasy request | 163 | nnormal = (trans_terms - 1) * nhard |
158 | nnormal = (self.alltokens - 2) * nhard + (self.ntokens - 1) * neasy | ||
159 | nmigs = nhard | 164 | nmigs = nhard |
160 | 165 | ||
166 | # easy: take rho-1 normal, per neasy request | ||
167 | nnormal += normal_terms * neasy | ||
168 | |||
161 | if len(normal) < nnorm: | 169 | if len(normal) < nnorm: |
162 | # not enough normal requests. take underflow from migrations | 170 | # not enough normal requests. take underflow from migrations |
163 | nmigs += nnorm - len(normal) | 171 | nmigs += nnorm - len(normal) |
@@ -214,7 +222,8 @@ class FifoCopyEngineLockP2P(FifoEngineLock): | |||
214 | assert len(migs) == 0 | 222 | assert len(migs) == 0 |
215 | assert nhard == 0 | 223 | assert nhard == 0 |
216 | 224 | ||
217 | ntake = (self.ntokens - 1) * n | 225 | terms = min(self.ntokens - 1, len(self.users) - 1) |
226 | ntake = terms * n | ||
218 | sz = sum([len(l) for l in _local]) | 227 | sz = sum([len(l) for l in _local]) |
219 | local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sz) | 228 | local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sz) |
220 | b = np.sum(nmax(local, ntake)) | 229 | b = np.sum(nmax(local, ntake)) |
@@ -238,9 +247,21 @@ class FifoCopyEngineLockP2P(FifoEngineLock): | |||
238 | normal_from_cache = True | 247 | normal_from_cache = True |
239 | normal = self.normal_cache[t] | 248 | normal = self.normal_cache[t] |
240 | 249 | ||
241 | nnorm = (self.alltokens - 2)*nhard + (self.ntokens - 1) * neasy | 250 | |
251 | assert self.alltokens >= 2 | ||
252 | |||
253 | # FIFO queueing. blocked by at most one request from each unique other task | ||
254 | # take len(tasks)-1 terms instead of rho-1 / rho*g - 1 tasks if that is smaller. | ||
255 | normal_terms = min(self.ntokens - 1, len(self.users) - 1) | ||
256 | trans_terms = min(self.alltokens - 1, len(self.users) - 1) | ||
257 | |||
258 | # hard: take (rho*g-2 normal) and 1 migration, per nhard request | ||
259 | nnorm = (trans_terms - 1) * nhard | ||
242 | nmigs = nhard | 260 | nmigs = nhard |
243 | 261 | ||
262 | # easy: take rho-1 normal, per neasy request | ||
263 | nnorm += normal_terms * neasy | ||
264 | |||
244 | if len(normal) < nnorm: | 265 | if len(normal) < nnorm: |
245 | # not enough normal requests. take underflow from migrations | 266 | # not enough normal requests. take underflow from migrations |
246 | nmigs += nnorm - len(normal) | 267 | nmigs += nnorm - len(normal) |
@@ -300,17 +321,20 @@ class FifoCopyEngineLockAlternateP2P(FifoEngineLock): | |||
300 | # every migration request is blocked by: | 321 | # every migration request is blocked by: |
301 | # (rho*g-1) normal requests and (rho*g-1) migration requests | 322 | # (rho*g-1) normal requests and (rho*g-1) migration requests |
302 | 323 | ||
324 | normal_terms = min(self.ntokens - 1, len(self.users) - 1) | ||
325 | trans_terms = min(self.alltokens - 1, len(self.users) - 1) | ||
326 | |||
303 | # compute blocking for normal requests, adjusting for overflow | 327 | # compute blocking for normal requests, adjusting for overflow |
304 | nnorm = min((self.ntokens-1)*nrequests, len(normal)) | 328 | nnorm = min(normal_terms*nrequests, len(normal)) |
305 | nmig = min(min((self.ntokens-1)*nrequests, len(migs)), nnorm) | 329 | nmig = min(min(normal_terms*nrequests, len(migs)), nnorm) |
306 | 330 | ||
307 | # number of interfering requests remaining in each category | 331 | # number of interfering requests remaining in each category |
308 | nremain = max(0, len(normal) - nnorm) | 332 | nremain = max(0, len(normal) - nnorm) |
309 | nmigremain = max(0, len(migs) - nmig) | 333 | nmigremain = max(0, len(migs) - nmig) |
310 | 334 | ||
311 | # compute blocking for migration requests | 335 | # compute blocking for migration requests |
312 | nmig_extra = min((self.alltokens-1)*nmigrequests, nmigremain) | 336 | nmig_extra = min(trans_terms*nmigrequests, nmigremain) |
313 | nnorm_extra = min(min((self.alltokens-1)*nmigrequests, nmig_extra), nremain) | 337 | nnorm_extra = min(min(trans_terms*nmigrequests, nmig_extra), nremain) |
314 | 338 | ||
315 | # accumulate total blocking | 339 | # accumulate total blocking |
316 | nnorm += nnorm_extra | 340 | nnorm += nnorm_extra |
@@ -354,17 +378,20 @@ class FifoCopyEngineLockAlternateP2P(FifoEngineLock): | |||
354 | local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sum([len(n) for n in _local])) | 378 | local = np.fromiter(itertools.chain.from_iterable(_local), np.float32, count=sum([len(n) for n in _local])) |
355 | remote = np.fromiter(itertools.chain.from_iterable(_remote), np.float32, count=sum([len(n) for n in _remote])) | 379 | remote = np.fromiter(itertools.chain.from_iterable(_remote), np.float32, count=sum([len(n) for n in _remote])) |
356 | 380 | ||
381 | normal_terms = min(self.ntokens - 1, len(self.users) - 1) | ||
382 | trans_terms = min(self.alltokens - 1, len(self.users) - 1) | ||
383 | |||
357 | # compute blocking for local requests, adjusting for overflow | 384 | # compute blocking for local requests, adjusting for overflow |
358 | nlocal = min((self.ntokens-1)*nrequests, len(local)) | 385 | nlocal = min(normal_terms*nrequests, len(local)) |
359 | nmig = min(min((self.ntokens-1)*nrequests, len(migs)), nlocal) | 386 | nmig = min(min(normal_terms*nrequests, len(migs)), nlocal) |
360 | 387 | ||
361 | # number of interfering requests remaining in each category | 388 | # number of interfering requests remaining in each category |
362 | nremain = max(0, len(local) + len(remote) - nlocal) | 389 | nremain = max(0, len(local) + len(remote) - nlocal) |
363 | nmigremain = max(0, len(migs) - nmig) | 390 | nmigremain = max(0, len(migs) - nmig) |
364 | 391 | ||
365 | # compute blocking for migration requests | 392 | # compute blocking for migration requests |
366 | nmig_extra = min((self.alltokens-1)*nmigrequests, nmigremain) | 393 | nmig_extra = min(trans_terms*nmigrequests, nmigremain) |
367 | nnorm_extra = min(min((self.alltokens-1)*nmigrequests, nmig_extra), nremain) | 394 | nnorm_extra = min(min(trans_terms*nmigrequests, nmig_extra), nremain) |
368 | 395 | ||
369 | nmig += nmig_extra | 396 | nmig += nmig_extra |
370 | 397 | ||
@@ -431,11 +458,18 @@ class KfmlpTokenLock(TokenLock): | |||
431 | class R2dglpTokenLock(KfmlpTokenLock): | 458 | class R2dglpTokenLock(KfmlpTokenLock): |
432 | def __init__(self, tasks, ncpu, ngpu, ntokens): | 459 | def __init__(self, tasks, ncpu, ngpu, ntokens): |
433 | super(R2dglpTokenLock,self).__init__(tasks, ngpu, ntokens) | 460 | super(R2dglpTokenLock,self).__init__(tasks, ngpu, ntokens) |
434 | fifo_len = int(ceil(float(ncpu)/self.k)) | 461 | if self.k > 0: |
435 | self.nterms = 2*fifo_len - 1 | 462 | fifo_len = int(ceil(float(ncpu)/self.k)) |
436 | self.fifo_capacity = fifo_len * self.k | 463 | self.nterms = 2*fifo_len - 1 |
464 | self.fifo_capacity = fifo_len * self.k | ||
465 | else: | ||
466 | self.nterms = None | ||
467 | self.fifo_capacity = None | ||
437 | 468 | ||
438 | def blocking(self, t): | 469 | def blocking(self, t): |
470 | assert self.nterms is not None | ||
471 | assert self.fifo_capacity is not None | ||
472 | |||
439 | if len(self.users) <= self.fifo_capacity: | 473 | if len(self.users) <= self.fifo_capacity: |
440 | # Few enough users that R2DGLP operates like k-FMLP | 474 | # Few enough users that R2DGLP operates like k-FMLP |
441 | return super(R2dglpTokenLock,self).blocking(t) | 475 | return super(R2dglpTokenLock,self).blocking(t) |
@@ -457,7 +491,7 @@ class CkomlpTokenLock(TokenLock): | |||
457 | super(CkomlpTokenLock,self).__init__(tasks, ngpu, ntokens) | 491 | super(CkomlpTokenLock,self).__init__(tasks, ngpu, ntokens) |
458 | self.k = ngpu*ntokens | 492 | self.k = ngpu*ntokens |
459 | self.ncpu = ncpu | 493 | self.ncpu = ncpu |
460 | self.nterms = int(ceil(float(ncpu)/self.k)) - 1 | 494 | self.nterms = int(ceil(float(ncpu)/self.k)) - 1 if self.k > 0 else None |
461 | self.other_locks = sets.Set() | 495 | self.other_locks = sets.Set() |
462 | # array of unique cluster ids | 496 | # array of unique cluster ids |
463 | self.clusters = set([u.partition for u in self.users]) | 497 | self.clusters = set([u.partition for u in self.users]) |
@@ -466,6 +500,7 @@ class CkomlpTokenLock(TokenLock): | |||
466 | 500 | ||
467 | def request_blocking(self, t): | 501 | def request_blocking(self, t): |
468 | assert t in self.users | 502 | assert t in self.users |
503 | assert self.nterms is not None | ||
469 | 504 | ||
470 | # return previously computed blocking term if it exists | 505 | # return previously computed blocking term if it exists |
471 | if t in self.cache: | 506 | if t in self.cache: |