1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
import re
import time
import subprocess
import os
import stat
import config.config as conf
def num_cpus():
'''Return the number of CPUs in the system.'''
lnx_re = re.compile(r'^(processor|online)')
cpus = 0
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if lnx_re.match(line):
cpus += 1
return cpus
def ft_freq():
umachine = subprocess.check_output(["uname", "-m"])
if re.match("armv7", umachine):
# Arm V7s use a millisecond timer
freq = 1000.0
elif re.match("x86", umachine):
# X86 timer is equal to processor clock
reg = re.compile(r'^cpu MHz\s*:\s*(?P<FREQ>\d+)', re.M)
with open('/proc/cpuinfo', 'r') as f:
data = f.read()
match = re.search(reg, data)
if not match:
raise Exception("Cannot parse CPU frequency from x86 CPU!")
freq = int(match.group('FREQ'))
else:
# You're on your own
freq = 0
return freq
def switch_scheduler(switch_to_in):
'''Switch the scheduler to whatever is passed in.
This methods sleeps for two seconds to give Linux the chance to execute
schedule switching code. Raises an exception if the switch does not work.
'''
switch_to = str(switch_to_in).strip()
with open('/proc/litmus/active_plugin', 'w') as active_plugin:
subprocess.Popen(["echo", switch_to], stdout=active_plugin)
# it takes a bit to do the switch, sleep an arbitrary amount of time
time.sleep(2)
with open('/proc/litmus/active_plugin', 'r') as active_plugin:
cur_plugin = active_plugin.read().strip()
if switch_to != cur_plugin:
raise Exception("Could not switch to plugin: %s" % switch_to)
def uname_matches(reg):
data = subprocess.check_output(["uname", "-r"])
return bool( re.match(reg, data) )
def is_executable(fname):
'''Return whether the file passed in is executable'''
mode = os.stat(fname)[stat.ST_MODE]
return mode & stat.S_IXUSR and mode & stat.S_IRUSR
def is_device(dev):
if not os.path.exists(dev):
return False
mode = os.stat(dev)[stat.ST_MODE]
return not (not mode & stat.S_IFCHR)
def waiting_tasks():
reg = re.compile(r'^ready.*?(?P<READY>\d+)$', re.M)
with open('/proc/litmus/stats', 'r') as f:
data = f.read()
# Ignore if no tasks are waiting for release
match = re.search(reg, data)
ready = match.group("READY")
return 0 if not ready else int(ready)
def release_tasks():
try:
data = subprocess.check_output([conf.BINS['release']])
except subprocess.CalledProcessError:
raise Exception('Something went wrong in release_ts')
released = re.findall(r"([0-9]+) real-time", data)[0]
return int(released)
|