| #!/usr/bin/python | 
 | # | 
 | # rt-mutex tester | 
 | # | 
 | # (C) 2006 Thomas Gleixner <tglx@linutronix.de> | 
 | # | 
 | # This program is free software; you can redistribute it and/or modify | 
 | # it under the terms of the GNU General Public License version 2 as | 
 | # published by the Free Software Foundation. | 
 | # | 
 | import os | 
 | import sys | 
 | import getopt | 
 | import shutil | 
 | import string | 
 |  | 
 | # Globals | 
 | quiet = 0 | 
 | test = 0 | 
 | comments = 0 | 
 |  | 
 | sysfsprefix = "/sys/devices/system/rttest/rttest" | 
 | statusfile = "/status" | 
 | commandfile = "/command" | 
 |  | 
 | # Command opcodes | 
 | cmd_opcodes = { | 
 |     "schedother"    : "1", | 
 |     "schedfifo"     : "2", | 
 |     "lock"          : "3", | 
 |     "locknowait"    : "4", | 
 |     "lockint"       : "5", | 
 |     "lockintnowait" : "6", | 
 |     "lockcont"      : "7", | 
 |     "unlock"        : "8", | 
 |     "signal"        : "11", | 
 |     "resetevent"    : "98", | 
 |     "reset"         : "99", | 
 |     } | 
 |  | 
 | test_opcodes = { | 
 |     "prioeq"        : ["P" , "eq" , None], | 
 |     "priolt"        : ["P" , "lt" , None], | 
 |     "priogt"        : ["P" , "gt" , None], | 
 |     "nprioeq"       : ["N" , "eq" , None], | 
 |     "npriolt"       : ["N" , "lt" , None], | 
 |     "npriogt"       : ["N" , "gt" , None], | 
 |     "unlocked"      : ["M" , "eq" , 0], | 
 |     "trylock"       : ["M" , "eq" , 1], | 
 |     "blocked"       : ["M" , "eq" , 2], | 
 |     "blockedwake"   : ["M" , "eq" , 3], | 
 |     "locked"        : ["M" , "eq" , 4], | 
 |     "opcodeeq"      : ["O" , "eq" , None], | 
 |     "opcodelt"      : ["O" , "lt" , None], | 
 |     "opcodegt"      : ["O" , "gt" , None], | 
 |     "eventeq"       : ["E" , "eq" , None], | 
 |     "eventlt"       : ["E" , "lt" , None], | 
 |     "eventgt"       : ["E" , "gt" , None], | 
 |     } | 
 |  | 
 | # Print usage information | 
 | def usage(): | 
 |     print "rt-tester.py <-c -h -q -t> <testfile>" | 
 |     print " -c    display comments after first command" | 
 |     print " -h    help" | 
 |     print " -q    quiet mode" | 
 |     print " -t    test mode (syntax check)" | 
 |     print " testfile: read test specification from testfile" | 
 |     print " otherwise from stdin" | 
 |     return | 
 |  | 
 | # Print progress when not in quiet mode | 
 | def progress(str): | 
 |     if not quiet: | 
 |         print str | 
 |  | 
 | # Analyse a status value | 
 | def analyse(val, top, arg): | 
 |  | 
 |     intval = int(val) | 
 |  | 
 |     if top[0] == "M": | 
 |         intval = intval / (10 ** int(arg)) | 
 | 	intval = intval % 10 | 
 |         argval = top[2] | 
 |     elif top[0] == "O": | 
 |         argval = int(cmd_opcodes.get(arg, arg)) | 
 |     else: | 
 |         argval = int(arg) | 
 |  | 
 |     # progress("%d %s %d" %(intval, top[1], argval)) | 
 |  | 
 |     if top[1] == "eq" and intval == argval: | 
 | 	return 1 | 
 |     if top[1] == "lt" and intval < argval: | 
 |         return 1 | 
 |     if top[1] == "gt" and intval > argval: | 
 | 	return 1 | 
 |     return 0 | 
 |  | 
 | # Parse the commandline | 
 | try: | 
 |     (options, arguments) = getopt.getopt(sys.argv[1:],'chqt') | 
 | except getopt.GetoptError, ex: | 
 |     usage() | 
 |     sys.exit(1) | 
 |  | 
 | # Parse commandline options | 
 | for option, value in options: | 
 |     if option == "-c": | 
 |         comments = 1 | 
 |     elif option == "-q": | 
 |         quiet = 1 | 
 |     elif option == "-t": | 
 |         test = 1 | 
 |     elif option == '-h': | 
 |         usage() | 
 |         sys.exit(0) | 
 |  | 
 | # Select the input source | 
 | if arguments: | 
 |     try: | 
 |         fd = open(arguments[0]) | 
 |     except Exception,ex: | 
 |         sys.stderr.write("File not found %s\n" %(arguments[0])) | 
 |         sys.exit(1) | 
 | else: | 
 |     fd = sys.stdin | 
 |  | 
 | linenr = 0 | 
 |  | 
 | # Read the test patterns | 
 | while 1: | 
 |  | 
 |     linenr = linenr + 1 | 
 |     line = fd.readline() | 
 |     if not len(line): | 
 |         break | 
 |  | 
 |     line = line.strip() | 
 |     parts = line.split(":") | 
 |  | 
 |     if not parts or len(parts) < 1: | 
 |         continue | 
 |  | 
 |     if len(parts[0]) == 0: | 
 |         continue | 
 |  | 
 |     if parts[0].startswith("#"): | 
 | 	if comments > 1: | 
 | 	    progress(line) | 
 | 	continue | 
 |  | 
 |     if comments == 1: | 
 | 	comments = 2 | 
 |  | 
 |     progress(line) | 
 |  | 
 |     cmd = parts[0].strip().lower() | 
 |     opc = parts[1].strip().lower() | 
 |     tid = parts[2].strip() | 
 |     dat = parts[3].strip() | 
 |  | 
 |     try: | 
 |         # Test or wait for a status value | 
 |         if cmd == "t" or cmd == "w": | 
 |             testop = test_opcodes[opc] | 
 |  | 
 |             fname = "%s%s%s" %(sysfsprefix, tid, statusfile) | 
 |             if test: | 
 | 		print fname | 
 |                 continue | 
 |  | 
 |             while 1: | 
 |                 query = 1 | 
 |                 fsta = open(fname, 'r') | 
 |                 status = fsta.readline().strip() | 
 |                 fsta.close() | 
 |                 stat = status.split(",") | 
 |                 for s in stat: | 
 | 		    s = s.strip() | 
 |                     if s.startswith(testop[0]): | 
 |                         # Separate status value | 
 |                         val = s[2:].strip() | 
 |                         query = analyse(val, testop, dat) | 
 |                         break | 
 |                 if query or cmd == "t": | 
 |                     break | 
 |  | 
 |             progress("   " + status) | 
 |  | 
 |             if not query: | 
 |                 sys.stderr.write("Test failed in line %d\n" %(linenr)) | 
 | 		sys.exit(1) | 
 |  | 
 |         # Issue a command to the tester | 
 |         elif cmd == "c": | 
 |             cmdnr = cmd_opcodes[opc] | 
 |             # Build command string and sys filename | 
 |             cmdstr = "%s:%s" %(cmdnr, dat) | 
 |             fname = "%s%s%s" %(sysfsprefix, tid, commandfile) | 
 |             if test: | 
 | 		print fname | 
 |                 continue | 
 |             fcmd = open(fname, 'w') | 
 |             fcmd.write(cmdstr) | 
 |             fcmd.close() | 
 |  | 
 |     except Exception,ex: | 
 |     	sys.stderr.write(str(ex)) | 
 |         sys.stderr.write("\nSyntax error in line %d\n" %(linenr)) | 
 |         if not test: | 
 |             fd.close() | 
 |             sys.exit(1) | 
 |  | 
 | # Normal exit pass | 
 | print "Pass" | 
 | sys.exit(0) | 
 |  | 
 |  |