1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
import re
import time
import subprocess
import config.config as conf
def scheduler():
with open('/proc/litmus/active_plugin', 'r') as active_plugin:
cur_plugin = active_plugin.read().strip()
return cur_plugin
def switch_scheduler(switch_to_in):
'''Switch the scheduler to whatever is passed in.
This methods sleeps for two seconds to give Linux the chance to execute
schedule switching code. Raises an exception if the switch does not work.
'''
switch_to = str(switch_to_in).strip()
with open('/proc/litmus/active_plugin', 'w') as active_plugin:
subprocess.Popen(["echo", switch_to], stdout=active_plugin)
# It takes a bit to do the switch, sleep an arbitrary amount of time
time.sleep(2)
cur_plugin = scheduler()
if switch_to != cur_plugin:
raise Exception("Could not switch to '%s' (check dmesg), current: %s" %\
(switch_to, cur_plugin))
def waiting_tasks():
reg = re.compile(r'^ready.*?(?P<WAITING>\d+)$', re.M)
with open('/proc/litmus/stats', 'r') as f:
data = f.read()
# Ignore if no tasks are waiting for release
waiting = re.search(reg, data).group("WAITING")
return 0 if not waiting else int(waiting)
def all_tasks():
reg = re.compile(r'^real-time.*?(?P<TASKS>\d+)$', re.M)
with open('/proc/litmus/stats', 'r') as f:
data = f.read()
ready = re.search(reg, data).group("TASKS")
return 0 if not ready else int(ready)
def release_tasks():
try:
data = subprocess.check_output([conf.BINS['release']])
except subprocess.CalledProcessError:
raise Exception('Something went wrong in release_ts')
released = re.findall(r"([0-9]+) real-time", data)[0]
return int(released)
|