aboutsummaryrefslogtreecommitdiffstats
path: root/run/litmus_util.py
blob: 8a7f87d3718beb075c2361ae07138d37f581ec57 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import re
import time
import subprocess
import os
import stat
import config.config as conf

def num_cpus():
    '''Return the number of CPUs in the system.'''

    lnx_re = re.compile(r'^(processor|online)')
    cpus = 0

    with open('/proc/cpuinfo', 'r') as f:
        for line in f:
            if lnx_re.match(line):
                cpus += 1
    return cpus

def ft_freq():
    umachine = subprocess.check_output(["uname", "-m"])

    if re.match("armv7", umachine):
        # Arm V7s use a millisecond timer
        freq = 1000.0
    elif re.match("x86", umachine):
        # X86 timer is equal to processor clock
        reg = re.compile(r'^cpu MHz\s*:\s*(?P<FREQ>\d+)', re.M)
        with open('/proc/cpuinfo', 'r') as f:
            data = f.read()

        match = re.search(reg, data)
        if not match:
            raise Exception("Cannot parse CPU frequency from x86 CPU!")
        freq = int(match.group('FREQ'))
    else:
        # You're on your own
        freq = 0
    return freq

def switch_scheduler(switch_to_in):
    '''Switch the scheduler to whatever is passed in.

    This methods sleeps for two seconds to give Linux the chance to execute
    schedule switching code. Raises an exception if the switch does not work.

    '''

    switch_to = str(switch_to_in).strip()

    with open('/proc/litmus/active_plugin', 'w') as active_plugin:
        subprocess.Popen(["echo", switch_to], stdout=active_plugin)

    # it takes a bit to do the switch, sleep an arbitrary amount of time
    time.sleep(2)

    with open('/proc/litmus/active_plugin', 'r') as active_plugin:
        cur_plugin = active_plugin.read().strip()

    if switch_to != cur_plugin:
        raise Exception("Could not switch to plugin: %s" % switch_to)

def uname_matches(reg):
    data = subprocess.check_output(["uname", "-r"])
    return bool( re.match(reg, data) )

def is_executable(fname):
    '''Return whether the file passed in is executable'''
    mode = os.stat(fname)[stat.ST_MODE]
    return mode & stat.S_IXUSR and mode & stat.S_IRUSR

def is_device(dev):
    if not os.path.exists(dev):
        return False
    mode = os.stat(dev)[stat.ST_MODE]
    return not (not mode & stat.S_IFCHR)

def waiting_tasks():
    reg = re.compile(r'^ready.*?(?P<READY>\d+)$', re.M)
    with open('/proc/litmus/stats', 'r') as f:
        data = f.read()

    # Ignore if no tasks are waiting for release
    match = re.search(reg, data)
    ready = match.group("READY")

    return 0 if not ready else int(ready)

def release_tasks():

    try:
        data = subprocess.check_output([conf.BINS['release']])
    except subprocess.CalledProcessError:
        raise Exception('Something went wrong in release_ts')

    released = re.findall(r"([0-9]+) real-time", data)[0]

    return int(released)