| #!/usr/bin/env python3 | 
 | # SPDX-License-Identifier: GPL-2.0 | 
 | # | 
 | # Program to allow users to fuzz test Hyper-V drivers | 
 | # by interfacing with Hyper-V debugfs attributes. | 
 | # Current test methods available: | 
 | #       1. delay testing | 
 | # | 
 | # Current file/directory structure of hyper-V debugfs: | 
 | #       /sys/kernel/debug/hyperv/UUID | 
 | #       /sys/kernel/debug/hyperv/UUID/<test-state filename> | 
 | #       /sys/kernel/debug/hyperv/UUID/<test-method sub-directory> | 
 | # | 
 | # author: Branden Bonaby <brandonbonaby94@gmail.com> | 
 |  | 
 | import os | 
 | import cmd | 
 | import argparse | 
 | import glob | 
 | from argparse import RawDescriptionHelpFormatter | 
 | from argparse import RawTextHelpFormatter | 
 | from enum import Enum | 
 |  | 
 | # Do not change unless, you change the debugfs attributes | 
 | # in /drivers/hv/debugfs.c. All fuzz testing | 
 | # attributes will start with "fuzz_test". | 
 |  | 
 | # debugfs path for hyperv must exist before proceeding | 
 | debugfs_hyperv_path = "/sys/kernel/debug/hyperv" | 
 | if not os.path.isdir(debugfs_hyperv_path): | 
 |         print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) | 
 |         exit(-1) | 
 |  | 
 | class dev_state(Enum): | 
 |         off = 0 | 
 |         on = 1 | 
 |  | 
 | # File names, that correspond to the files created in | 
 | # /drivers/hv/debugfs.c | 
 | class f_names(Enum): | 
 |         state_f = "fuzz_test_state" | 
 |         buff_f =  "fuzz_test_buffer_interrupt_delay" | 
 |         mess_f =  "fuzz_test_message_delay" | 
 |  | 
 | # Both single_actions and all_actions are used | 
 | # for error checking and to allow for some subparser | 
 | # names to be abbreviated. Do not abbreviate the | 
 | # test method names, as it will become less intuitive | 
 | # as to what the user can do. If you do decide to | 
 | # abbreviate the test method name, make sure the main | 
 | # function reflects this change. | 
 |  | 
 | all_actions = [ | 
 |         "disable_all", | 
 |         "D", | 
 |         "enable_all", | 
 |         "view_all", | 
 |         "V" | 
 | ] | 
 |  | 
 | single_actions = [ | 
 |         "disable_single", | 
 |         "d", | 
 |         "enable_single", | 
 |         "view_single", | 
 |         "v" | 
 | ] | 
 |  | 
 | def main(): | 
 |  | 
 |         file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) | 
 |         args = parse_args() | 
 |         if (not args.action): | 
 |                 print ("Error, no options selected...exiting") | 
 |                 exit(-1) | 
 |         arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } | 
 |         arg_set.add(args.action) | 
 |         path = args.path if "path" in arg_set else None | 
 |         if (path and path[-1] == "/"): | 
 |                 path = path[:-1] | 
 |         validate_args_path(path, arg_set, file_map) | 
 |         if (path and "enable_single" in arg_set): | 
 |             state_path = locate_state(path, file_map) | 
 |             set_test_state(state_path, dev_state.on.value, args.quiet) | 
 |  | 
 |         # Use subparsers as the key for different actions | 
 |         if ("delay" in arg_set): | 
 |                 validate_delay_values(args.delay_time) | 
 |                 if (args.enable_all): | 
 |                         set_delay_all_devices(file_map, args.delay_time, | 
 |                                               args.quiet) | 
 |                 else: | 
 |                         set_delay_values(path, file_map, args.delay_time, | 
 |                                          args.quiet) | 
 |         elif ("disable_all" in arg_set or "D" in arg_set): | 
 |                 disable_all_testing(file_map) | 
 |         elif ("disable_single" in arg_set or "d" in arg_set): | 
 |                 disable_testing_single_device(path, file_map) | 
 |         elif ("view_all" in arg_set or "V" in arg_set): | 
 |                 get_all_devices_test_status(file_map) | 
 |         elif ("view_single" in arg_set or  "v" in arg_set): | 
 |                 get_device_test_values(path, file_map) | 
 |  | 
 | # Get the state location | 
 | def locate_state(device, file_map): | 
 |         return file_map[device][f_names.state_f.value] | 
 |  | 
 | # Validate delay values to make sure they are acceptable to | 
 | # enable delays on a device | 
 | def validate_delay_values(delay): | 
 |  | 
 |         if (delay[0]  == -1 and delay[1] == -1): | 
 |                 print("\nError, At least 1 value must be greater than 0") | 
 |                 exit(-1) | 
 |         for i in delay: | 
 |                 if (i < -1 or i == 0 or i > 1000): | 
 |                         print("\nError, Values must be  equal to -1 " | 
 |                               "or be > 0 and <= 1000") | 
 |                         exit(-1) | 
 |  | 
 | # Validate argument path | 
 | def validate_args_path(path, arg_set, file_map): | 
 |  | 
 |         if (not path and any(element in arg_set for element in single_actions)): | 
 |                 print("Error, path (-p) REQUIRED for the specified option. " | 
 |                       "Use (-h) to check usage.") | 
 |                 exit(-1) | 
 |         elif (path and any(item in arg_set for item in all_actions)): | 
 |                 print("Error, path (-p) NOT REQUIRED for the specified option. " | 
 |                       "Use (-h) to check usage." ) | 
 |                 exit(-1) | 
 |         elif (path not in file_map and any(item in arg_set | 
 |                                            for item in single_actions)): | 
 |                 print("Error, path '{}' not a valid vmbus device".format(path)) | 
 |                 exit(-1) | 
 |  | 
 | # display Testing status of single device | 
 | def get_device_test_values(path, file_map): | 
 |  | 
 |         for name in file_map[path]: | 
 |                 file_location = file_map[path][name] | 
 |                 print( name + " = " + str(read_test_files(file_location))) | 
 |  | 
 | # Create a map of the vmbus devices and their associated files | 
 | # [key=device, value = [key = filename, value = file path]] | 
 | def recursive_file_lookup(path, file_map): | 
 |  | 
 |         for f_path in glob.iglob(path + '**/*'): | 
 |                 if (os.path.isfile(f_path)): | 
 |                         if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): | 
 |                                 directory = f_path.rsplit("/",1)[0] | 
 |                         else: | 
 |                                 directory = f_path.rsplit("/",2)[0] | 
 |                         f_name = f_path.split("/")[-1] | 
 |                         if (file_map.get(directory)): | 
 |                                 file_map[directory].update({f_name:f_path}) | 
 |                         else: | 
 |                                 file_map[directory] = {f_name:f_path} | 
 |                 elif (os.path.isdir(f_path)): | 
 |                         recursive_file_lookup(f_path,file_map) | 
 |         return file_map | 
 |  | 
 | # display Testing state of devices | 
 | def get_all_devices_test_status(file_map): | 
 |  | 
 |         for device in file_map: | 
 |                 if (get_test_state(locate_state(device, file_map)) is 1): | 
 |                         print("Testing = ON for: {}" | 
 |                               .format(device.split("/")[5])) | 
 |                 else: | 
 |                         print("Testing = OFF for: {}" | 
 |                               .format(device.split("/")[5])) | 
 |  | 
 | # read the vmbus device files, path must be absolute path before calling | 
 | def read_test_files(path): | 
 |         try: | 
 |                 with open(path,"r") as f: | 
 |                         file_value = f.readline().strip() | 
 |                 return int(file_value) | 
 |  | 
 |         except IOError as e: | 
 |                 errno, strerror = e.args | 
 |                 print("I/O error({0}): {1} on file {2}" | 
 |                       .format(errno, strerror, path)) | 
 |                 exit(-1) | 
 |         except ValueError: | 
 |                 print ("Element to int conversion error in: \n{}".format(path)) | 
 |                 exit(-1) | 
 |  | 
 | # writing to vmbus device files, path must be absolute path before calling | 
 | def write_test_files(path, value): | 
 |  | 
 |         try: | 
 |                 with open(path,"w") as f: | 
 |                         f.write("{}".format(value)) | 
 |         except IOError as e: | 
 |                 errno, strerror = e.args | 
 |                 print("I/O error({0}): {1} on file {2}" | 
 |                       .format(errno, strerror, path)) | 
 |                 exit(-1) | 
 |  | 
 | # set testing state of device | 
 | def set_test_state(state_path, state_value, quiet): | 
 |  | 
 |         write_test_files(state_path, state_value) | 
 |         if (get_test_state(state_path) is 1): | 
 |                 if (not quiet): | 
 |                         print("Testing = ON for device: {}" | 
 |                               .format(state_path.split("/")[5])) | 
 |         else: | 
 |                 if (not quiet): | 
 |                         print("Testing = OFF for device: {}" | 
 |                               .format(state_path.split("/")[5])) | 
 |  | 
 | # get testing state of device | 
 | def get_test_state(state_path): | 
 |         #state == 1 - test = ON | 
 |         #state == 0 - test = OFF | 
 |         return  read_test_files(state_path) | 
 |  | 
 | # write 1 - 1000 microseconds, into a single device using the | 
 | # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay | 
 | # debugfs attributes | 
 | def set_delay_values(device, file_map, delay_length, quiet): | 
 |  | 
 |         try: | 
 |                 interrupt = file_map[device][f_names.buff_f.value] | 
 |                 message = file_map[device][f_names.mess_f.value] | 
 |  | 
 |                 # delay[0]- buffer interrupt delay, delay[1]- message delay | 
 |                 if (delay_length[0] >= 0 and delay_length[0] <= 1000): | 
 |                         write_test_files(interrupt, delay_length[0]) | 
 |                 if (delay_length[1] >= 0 and delay_length[1] <= 1000): | 
 |                         write_test_files(message, delay_length[1]) | 
 |                 if (not quiet): | 
 |                         print("Buffer delay testing = {} for: {}" | 
 |                               .format(read_test_files(interrupt), | 
 |                                       interrupt.split("/")[5])) | 
 |                         print("Message delay testing = {} for: {}" | 
 |                               .format(read_test_files(message), | 
 |                                       message.split("/")[5])) | 
 |         except IOError as e: | 
 |                 errno, strerror = e.args | 
 |                 print("I/O error({0}): {1} on files {2}{3}" | 
 |                       .format(errno, strerror, interrupt, message)) | 
 |                 exit(-1) | 
 |  | 
 | # enabling delay testing on all devices | 
 | def set_delay_all_devices(file_map, delay, quiet): | 
 |  | 
 |         for device in (file_map): | 
 |                 set_test_state(locate_state(device, file_map), | 
 |                                dev_state.on.value, | 
 |                                quiet) | 
 |                 set_delay_values(device, file_map, delay, quiet) | 
 |  | 
 | # disable all testing on a SINGLE device. | 
 | def disable_testing_single_device(device, file_map): | 
 |  | 
 |         for name in file_map[device]: | 
 |                 file_location = file_map[device][name] | 
 |                 write_test_files(file_location, dev_state.off.value) | 
 |         print("ALL testing now OFF for {}".format(device.split("/")[-1])) | 
 |  | 
 | # disable all testing on ALL devices | 
 | def disable_all_testing(file_map): | 
 |  | 
 |         for device in file_map: | 
 |                 disable_testing_single_device(device, file_map) | 
 |  | 
 | def parse_args(): | 
 |         parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" | 
 |                 "%(prog)s [delay]   [-h] [-e|-E] -t [-p]\n" | 
 |                 "%(prog)s [view_all       | V]      [-h]\n" | 
 |                 "%(prog)s [disable_all    | D]      [-h]\n" | 
 |                 "%(prog)s [disable_single | d]      [-h|-p]\n" | 
 |                 "%(prog)s [view_single    | v]      [-h|-p]\n" | 
 |                 "%(prog)s --version\n", | 
 |                 description = "\nUse lsvmbus to get vmbus device type " | 
 |                 "information.\n" "\nThe debugfs root path is " | 
 |                 "/sys/kernel/debug/hyperv", | 
 |                 formatter_class = RawDescriptionHelpFormatter) | 
 |         subparsers = parser.add_subparsers(dest = "action") | 
 |         parser.add_argument("--version", action = "version", | 
 |                 version = '%(prog)s 0.1.0') | 
 |         parser.add_argument("-q","--quiet", action = "store_true", | 
 |                 help = "silence none important test messages." | 
 |                        " This will only work when enabling testing" | 
 |                        " on a device.") | 
 |         # Use the path parser to hold the --path attribute so it can | 
 |         # be shared between subparsers. Also do the same for the state | 
 |         # parser, as all testing methods will use --enable_all and | 
 |         # enable_single. | 
 |         path_parser = argparse.ArgumentParser(add_help=False) | 
 |         path_parser.add_argument("-p","--path", metavar = "", | 
 |                 help = "Debugfs path to a vmbus device. The path " | 
 |                 "must be the absolute path to the device.") | 
 |         state_parser = argparse.ArgumentParser(add_help=False) | 
 |         state_group = state_parser.add_mutually_exclusive_group(required = True) | 
 |         state_group.add_argument("-E", "--enable_all", action = "store_const", | 
 |                                  const = "enable_all", | 
 |                                  help = "Enable the specified test type " | 
 |                                  "on ALL vmbus devices.") | 
 |         state_group.add_argument("-e", "--enable_single", | 
 |                                  action = "store_const", | 
 |                                  const = "enable_single", | 
 |                                  help = "Enable the specified test type on a " | 
 |                                  "SINGLE vmbus device.") | 
 |         parser_delay = subparsers.add_parser("delay", | 
 |                         parents = [state_parser, path_parser], | 
 |                         help = "Delay the ring buffer interrupt or the " | 
 |                         "ring buffer message reads in microseconds.", | 
 |                         prog = "vmbus_testing", | 
 |                         usage = "%(prog)s [-h]\n" | 
 |                         "%(prog)s -E -t [value] [value]\n" | 
 |                         "%(prog)s -e -t [value] [value] -p", | 
 |                         description = "Delay the ring buffer interrupt for " | 
 |                         "vmbus devices, or delay the ring buffer message " | 
 |                         "reads for vmbus devices (both in microseconds). This " | 
 |                         "is only on the host to guest channel.") | 
 |         parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, | 
 |                         type = check_range, default =[0,0], required = (True), | 
 |                         help = "Set [buffer] & [message] delay time. " | 
 |                         "Value constraints: -1 == value " | 
 |                         "or 0 < value <= 1000.\n" | 
 |                         "Use -1 to keep the previous value for that delay " | 
 |                         "type, or a value > 0 <= 1000 to change the delay " | 
 |                         "time.") | 
 |         parser_dis_all = subparsers.add_parser("disable_all", | 
 |                         aliases = ['D'], prog = "vmbus_testing", | 
 |                         usage = "%(prog)s [disable_all | D] -h\n" | 
 |                         "%(prog)s [disable_all | D]\n", | 
 |                         help = "Disable ALL testing on ALL vmbus devices.", | 
 |                         description = "Disable ALL testing on ALL vmbus " | 
 |                         "devices.") | 
 |         parser_dis_single = subparsers.add_parser("disable_single", | 
 |                         aliases = ['d'], | 
 |                         parents = [path_parser], prog = "vmbus_testing", | 
 |                         usage = "%(prog)s [disable_single | d] -h\n" | 
 |                         "%(prog)s [disable_single | d] -p\n", | 
 |                         help = "Disable ALL testing on a SINGLE vmbus device.", | 
 |                         description = "Disable ALL testing on a SINGLE vmbus " | 
 |                         "device.") | 
 |         parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], | 
 |                         help = "View the test state for ALL vmbus devices.", | 
 |                         prog = "vmbus_testing", | 
 |                         usage = "%(prog)s [view_all | V] -h\n" | 
 |                         "%(prog)s [view_all | V]\n", | 
 |                         description = "This shows the test state for ALL the " | 
 |                         "vmbus devices.") | 
 |         parser_view_single = subparsers.add_parser("view_single", | 
 |                         aliases = ['v'],parents = [path_parser], | 
 |                         help = "View the test values for a SINGLE vmbus " | 
 |                         "device.", | 
 |                         description = "This shows the test values for a SINGLE " | 
 |                         "vmbus device.", prog = "vmbus_testing", | 
 |                         usage = "%(prog)s [view_single | v] -h\n" | 
 |                         "%(prog)s [view_single | v] -p") | 
 |  | 
 |         return  parser.parse_args() | 
 |  | 
 | # value checking for range checking input in parser | 
 | def check_range(arg1): | 
 |  | 
 |         try: | 
 |                 val = int(arg1) | 
 |         except ValueError as err: | 
 |                 raise argparse.ArgumentTypeError(str(err)) | 
 |         if val < -1 or val > 1000: | 
 |                 message = ("\n\nvalue must be -1 or  0 < value <= 1000. " | 
 |                            "Value program received: {}\n").format(val) | 
 |                 raise argparse.ArgumentTypeError(message) | 
 |         return val | 
 |  | 
 | if __name__ == "__main__": | 
 |         main() |