diff --git a/fusil/python/write_python_code.py b/fusil/python/write_python_code.py index b670e6f..afba85a 100644 --- a/fusil/python/write_python_code.py +++ b/fusil/python/write_python_code.py @@ -408,37 +408,31 @@ def _write_helper_call_functions(self) -> None: ) self.write(0, "import math") self.write(0, "import types") - self.write(0, "def compare_results(a, b):") - with self.indented(): - self.write( - 0, "if isinstance(a, types.FunctionType) and a.__name__ == '' and \\" - ) - self.write(0, " isinstance(b, types.FunctionType) and b.__name__ == '':") - self.write(1, "return True # Treat two lambdas as equal for our purposes") - - self.write(0, "if isinstance(a, complex) and isinstance(b, complex):") - with self.indented(): - self.write(0, "a_real_nan = math.isnan(a.real)") - self.write(0, "b_real_nan = math.isnan(b.real)") - self.write(0, "a_imag_nan = math.isnan(a.imag)") - self.write(0, "b_imag_nan = math.isnan(b.imag)") - self.write(0, "real_match = (a.real == b.real) or (a_real_nan and b_real_nan)") - self.write(0, "imag_match = (a.imag == b.imag) or (a_imag_nan and b_imag_nan)") - self.write(0, "return real_match and imag_match") - - self.write( - 0, - "if isinstance(a, float) and isinstance(b, float) and math.isnan(a) and math.isnan(b):", - ) - self.write(1, "return True") - - self.write(0, "if isinstance(a, object) and isinstance(b, object):") - self.write(1, "return True") - - self.write(0, "if isinstance(a, tuple) and isinstance(b, tuple) and len(a) == len(b):") - self.write(1, "return all(compare_results(x, y) for x, y in zip(a, b))") - - self.write(0, "return a == b") + # Fixed-shape helper: emit it as one block rather than line-by-line self.write calls. + self.write_block( + 0, + """ + def compare_results(a, b): + if isinstance(a, types.FunctionType) and a.__name__ == '' and \\ + isinstance(b, types.FunctionType) and b.__name__ == '': + return True # Treat two lambdas as equal for our purposes + if isinstance(a, complex) and isinstance(b, complex): + a_real_nan = math.isnan(a.real) + b_real_nan = math.isnan(b.real) + a_imag_nan = math.isnan(a.imag) + b_imag_nan = math.isnan(b.imag) + real_match = (a.real == b.real) or (a_real_nan and b_real_nan) + imag_match = (a.imag == b.imag) or (a_imag_nan and b_imag_nan) + return real_match and imag_match + if isinstance(a, float) and isinstance(b, float) and math.isnan(a) and math.isnan(b): + return True + if isinstance(a, object) and isinstance(b, object): + return True + if isinstance(a, tuple) and isinstance(b, tuple) and len(a) == len(b): + return all(compare_results(x, y) for x, y in zip(a, b)) + return a == b + """, + ) self.emptyLine() self.write(0, "SENTINEL_VALUE = object()") @@ -582,6 +576,26 @@ def oom_run(label, thunk, window=_OOM_WINDOW): def _write_main_fuzzing_logic(self) -> None: """Writes the core fuzzing loops for functions, classes, and objects.""" + self._write_fuzzing_logic_preamble() + + self._write_function_fuzzing_loop() + self.emptyLine() + + # OOM mode (Phase 2): after the module-level function sweeps, sweep class + # constructors and methods -- where the high-value unchecked-allocation bugs + # live -- then stop (the non-OOM class/object blocks below are skipped). + if self.options.oom_fuzz: + self._write_oom_class_fuzzing_loop() + return + + self._write_class_fuzzing_loop() + self.emptyLine() + + self._write_object_fuzzing_loop() + self.emptyLine() + + def _write_fuzzing_logic_preamble(self) -> None: + """Emits the module alias, thread/async registries, and the runtime imports.""" self.write(0, f"fuzz_target_module = {self.module_name}") self.emptyLine() @@ -604,113 +618,114 @@ def _write_main_fuzzing_logic(self) -> None: ) self.emptyLine() - if self.module_functions: - self.write_print_to_stderr( - 0, - f'"--- Fuzzing {len(self.module_functions)} functions in {self.module_name} ---"', - ) - n_calls = ( - self.options.oom_calls if self.options.oom_fuzz else self.options.functions_number - ) - for i in range(n_calls): - prefix = f"f{i + 1}" - - # --- SIMPLIFIED LOGIC --- - if self.options.oom_fuzz: - if self.options.oom_seq: - # OOM sequence: several functions under one failure window so a - # failure in one can corrupt state a later one trips over. - self._generate_oom_function_sequence(prefix) - continue - # OOM injection: wrap a function call in a dense set_nomemory sweep - func_name = choice(self.module_functions) - try: - func_obj = getattr(self.module, func_name) - except AttributeError: - continue - self._generate_oom_function_call(prefix, func_name, func_obj) - else: - # Standard (non-JIT) function call fuzzing - func_name = choice(self.module_functions) - try: - func_obj = getattr(self.module, func_name) - except AttributeError: - continue - self._generate_and_write_call( - prefix=prefix, - callable_name=func_name, - callable_obj=func_obj, - min_arg_count=1, - target_obj_expr="fuzz_target_module", - is_method_call=False, - generation_depth=0, - ) - - self.emptyLine() + def _write_function_fuzzing_loop(self) -> None: + """Fuzzes module-level functions (OOM sweep or standard calls).""" + if not self.module_functions: + return + self.write_print_to_stderr( + 0, + f'"--- Fuzzing {len(self.module_functions)} functions in {self.module_name} ---"', + ) + n_calls = self.options.oom_calls if self.options.oom_fuzz else self.options.functions_number + for i in range(n_calls): + self._write_one_function_fuzz(f"f{i + 1}") - # OOM mode (Phase 2): after module-level function sweeps, sweep class - # constructors and methods -- where the high-value unchecked-allocation bugs - # live -- then stop (the non-OOM class/object blocks below are skipped). + def _write_one_function_fuzz(self, prefix: str) -> None: + """Emits one function-fuzzing call for `prefix` (OOM sequence/sweep or standard).""" if self.options.oom_fuzz: - if self.module_classes and self.options.oom_classes > 0: - self.write_print_to_stderr( - 0, - f'"--- OOM-fuzzing {len(self.module_classes)} classes in {self.module_name} ---"', - ) - for i in range(self.options.oom_classes): - class_name = choice(self.module_classes) - if class_name in OBJECT_BLACKLIST: - continue - try: - class_obj = getattr(self.module, class_name) - except AttributeError: - continue - self._generate_oom_class_fuzzing(f"oc{i + 1}", class_name, class_obj) + if self.options.oom_seq: + # OOM sequence: several functions under one failure window so a + # failure in one can corrupt state a later one trips over. + self._generate_oom_function_sequence(prefix) + return + # OOM injection: wrap a function call in a dense set_nomemory sweep + func_name = choice(self.module_functions) + try: + func_obj = getattr(self.module, func_name) + except AttributeError: + return + self._generate_oom_function_call(prefix, func_name, func_obj) return - # Fuzz classes (instantiate and call methods) - if self.module_classes: - self.write_print_to_stderr( - 0, - f'"--- Fuzzing {len(self.module_classes)} classes in {self.module_name} ---"', - ) - for i in range(self.options.classes_number): - if not self.module_classes: - break - class_name = choice(self.module_classes) - if class_name in OBJECT_BLACKLIST: - continue - try: - class_obj = getattr(self.module, class_name) - except AttributeError: - continue + # Standard (non-JIT) function call fuzzing + func_name = choice(self.module_functions) + try: + func_obj = getattr(self.module, func_name) + except AttributeError: + return + self._generate_and_write_call( + prefix=prefix, + callable_name=func_name, + callable_obj=func_obj, + min_arg_count=1, + target_obj_expr="fuzz_target_module", + is_method_call=False, + generation_depth=0, + ) - self._fuzz_one_class(class_idx=i, class_name_str=class_name, class_type=class_obj) - self.emptyLine() + def _write_oom_class_fuzzing_loop(self) -> None: + """OOM Phase 2: sweep class constructors + methods for allocation-failure bugs.""" + if not (self.module_classes and self.options.oom_classes > 0): + return + self.write_print_to_stderr( + 0, + f'"--- OOM-fuzzing {len(self.module_classes)} classes in {self.module_name} ---"', + ) + for i in range(self.options.oom_classes): + class_name = choice(self.module_classes) + if class_name in OBJECT_BLACKLIST: + continue + try: + class_obj = getattr(self.module, class_name) + except AttributeError: + continue + self._generate_oom_class_fuzzing(f"oc{i + 1}", class_name, class_obj) - if self.module_objects: - self.write_print_to_stderr( - 0, - f'"--- Fuzzing {len(self.module_objects)} objects in {self.module_name} ---"', - ) - for i in range(self.options.objects_number): - if not self.module_objects: - break - obj_name = choice(self.module_objects) - if obj_name in OBJECT_BLACKLIST: - continue - try: - obj_instance = getattr(self.module, obj_name) - except AttributeError: - continue - if isinstance(obj_instance, ModuleType): - continue + def _write_class_fuzzing_loop(self) -> None: + """Fuzzes module classes: instantiate each and call its methods.""" + if not self.module_classes: + return + self.write_print_to_stderr( + 0, + f'"--- Fuzzing {len(self.module_classes)} classes in {self.module_name} ---"', + ) + for i in range(self.options.classes_number): + if not self.module_classes: + break + class_name = choice(self.module_classes) + if class_name in OBJECT_BLACKLIST: + continue + try: + class_obj = getattr(self.module, class_name) + except AttributeError: + continue - self._fuzz_one_module_object( - obj_idx=i, obj_name_str=obj_name, obj_instance=obj_instance - ) + self._fuzz_one_class(class_idx=i, class_name_str=class_name, class_type=class_obj) - self.emptyLine() + def _write_object_fuzzing_loop(self) -> None: + """Fuzzes module-level objects (skipping submodules and blacklisted names).""" + if not self.module_objects: + return + self.write_print_to_stderr( + 0, + f'"--- Fuzzing {len(self.module_objects)} objects in {self.module_name} ---"', + ) + for i in range(self.options.objects_number): + if not self.module_objects: + break + obj_name = choice(self.module_objects) + if obj_name in OBJECT_BLACKLIST: + continue + try: + obj_instance = getattr(self.module, obj_name) + except AttributeError: + continue + if isinstance(obj_instance, ModuleType): + continue + + self._fuzz_one_module_object( + obj_idx=i, obj_name_str=obj_name, obj_instance=obj_instance + ) def _fuzz_one_class(self, class_idx: int, class_name_str: str, class_type: type) -> None: """Generates code to instantiate a class and fuzz its methods.""" @@ -1241,18 +1256,7 @@ def _generate_and_write_call( return min_arg, max_arg = get_arg_number(callable_obj, callable_name, min_arg_count) - - rand_choice = randint(0, 19) - if rand_choice < 1: # 0 (5%) - num_args = 0 - elif rand_choice < 2: # 1 (5%) - num_args = 1 - elif rand_choice < 3: # max_arg + 1 (5%) - num_args = max_arg + 1 - elif min_arg == max_arg: - num_args = min_arg - else: # (remaining 85%) - num_args = randint(min_arg, max_arg) + num_args = self._pick_call_arg_count(min_arg, max_arg) call_prefix = ( f'callMethod("{prefix}", {target_obj_expr}, "{callable_name}"' @@ -1265,143 +1269,179 @@ def _generate_and_write_call( self.write(0, f"verbose={verbose})") self.emptyLine() - # Deep dive on the result: recursively fuzz the method's return value. This is - # opt-in (--deep-dive, default off) -- it is multiplicative (every returning call - # spawns another round of fuzzing on the result) and has not historically paid off. - if self.options.deep_dive: - self.write(0, f"# Deep dive on result of {callable_name}") - self.write( - 0, - f"if 'res_{prefix}' in locals() and res_{prefix} is not None and res_{prefix} is not SENTINEL_VALUE:", - ) + self._write_result_deep_dive(prefix, callable_name, generation_depth) + + self._write_target_func_fetch(prefix, callable_name, target_obj_expr) + self.emptyLine() + + self._write_thread_call_wrapper(prefix, callable_name, num_args) + self._write_async_call_wrapper(prefix, callable_name, num_args) + + def _pick_call_arg_count(self, min_arg: int, max_arg: int) -> int: + """Picks how many arguments to pass: mostly in-range, with edge cases (0, 1, max+1).""" + rand_choice = randint(0, 19) + if rand_choice < 1: # 0 (5%) + return 0 + elif rand_choice < 2: # 1 (5%) + return 1 + elif rand_choice < 3: # max_arg + 1 (5%) + return max_arg + 1 + elif min_arg == max_arg: + return min_arg + else: # (remaining 85%) + return randint(min_arg, max_arg) + + def _write_result_deep_dive( + self, prefix: str, callable_name: str, generation_depth: int + ) -> None: + """Opt-in (--deep-dive): recursively fuzz the call's return value. + + Multiplicative (every returning call spawns another round of fuzzing on the result) + and has not historically paid off, so it is off by default. + """ + if not self.options.deep_dive: + return + self.write(0, f"# Deep dive on result of {callable_name}") + self.write( + 0, + f"if 'res_{prefix}' in locals() and res_{prefix} is not None and res_{prefix} is not SENTINEL_VALUE:", + ) + with self.indented(): + self.write(0, f"{prefix}_res_type_name = type(res_{prefix}).__name__") + self.write(0, "try:") with self.indented(): - self.write(0, f"{prefix}_res_type_name = type(res_{prefix}).__name__") - self.write(0, "try:") - with self.indented(): - self.write_print_to_stderr( - 0, - f"f'CALL_RESULT ({prefix}): Method {callable_name} returned {{res_{prefix}!r}} of type {{{prefix}_res_type_name}}. Attempting deep dive.'", - ) - self.write(0, "except Exception as e:") - with self.indented(): - self.write_print_to_stderr( - 0, - "f'EXCEPTION printing CALL_RESULT: { e }'", - ) - self._dispatch_fuzz_on_instance( - current_prefix=f"{prefix}_res_dive", - target_obj_expr_str=f"res_{prefix}", # The variable holding the result - class_name_hint=f"{prefix}_res_type_name", # Runtime type name - generation_depth=generation_depth + 1, # Incremented depth + self.write_print_to_stderr( + 0, + f"f'CALL_RESULT ({prefix}): Method {callable_name} returned {{res_{prefix}!r}} of type {{{prefix}_res_type_name}}. Attempting deep dive.'", ) + self.write(0, "except Exception as e:") + with self.indented(): + self.write_print_to_stderr( + 0, + "f'EXCEPTION printing CALL_RESULT: { e }'", + ) + self._dispatch_fuzz_on_instance( + current_prefix=f"{prefix}_res_dive", + target_obj_expr_str=f"res_{prefix}", # The variable holding the result + class_name_hint=f"{prefix}_res_type_name", # Runtime type name + generation_depth=generation_depth + 1, # Incremented depth + ) + + def _write_target_func_fetch( + self, prefix: str, callable_name: str, target_obj_expr: str + ) -> None: + """Fetches the bound callable into `target_func` for the thread/async wrappers.""" + if not (self.enable_threads or self.enable_async): + return + self.write(0, "target_func = None") + self.write(0, "try:") + self.write(1, f"target_func = getattr({target_obj_expr}, '{callable_name}')") + self.write(0, "except Exception as e_get_target_func:") + self.write_print_to_stderr( + 1, + f'f"[{prefix}] Failed to get attribute {callable_name} from {target_obj_expr}: ' + f'{{e_get_target_func.__class__.__name__}} {{e_get_target_func}}"', + ) - if self.enable_threads or self.enable_async: - self.write(0, "target_func = None") + def _write_thread_call_wrapper(self, prefix: str, callable_name: str, num_args: int) -> None: + """Wraps the call in a Thread appended to fuzzer_threads_alive (when --threads).""" + if not self.enable_threads: + return + self.write(0, "if target_func is not None:") + with self.indented(): self.write(0, "try:") - self.write(1, f"target_func = getattr({target_obj_expr}, '{callable_name}')") - self.write(0, "except Exception as e_get_target_func:") + with self.indented(): + arg_expr_list = [] + for _ in range(num_args): + arg_lines = self.arg_generator.create_simple_argument() + arg_expr_list.append(" ".join(arg_lines)) + + args_tuple_str = f"({', '.join(arg_expr_list)}{',' if len(arg_expr_list) == 1 and num_args == 1 else ''})" + + self.write( + 0, + f"thread_obj = Thread(target=target_func, args={args_tuple_str}, name='{prefix}_{callable_name}')", + ) + self.write(0, "fuzzer_threads_alive.append(thread_obj)") + self.write(0, "except Exception as e_thread_create:") self.write_print_to_stderr( 1, - f'f"[{prefix}] Failed to get attribute {callable_name} from {target_obj_expr}: ' - f'{{e_get_target_func.__class__.__name__}} {{e_get_target_func}}"', + f'f"[{prefix}] Failed to create thread for {callable_name}: {{e_thread_create.__class__.__name__}}"', ) self.emptyLine() - if self.enable_threads: - self.write(0, "if target_func is not None:") + def _write_async_call_wrapper(self, prefix: str, callable_name: str, num_args: int) -> None: + """Wraps the call in an async task appended to fuzzer_async_tasks (when --async).""" + if not self.enable_async: + return + async_func_name = f"async_call_{prefix}_{callable_name}" + self.write(0, "if target_func is not None:") + with self.indented(): + self.write(0, f"def {async_func_name}(target_func=target_func):") with self.indented(): + self.write_print_to_stderr(0, f'"Starting async task: {async_func_name}"') + self.write(0, f"time.sleep({random() / 1000:.6f}) # Small delay") self.write(0, "try:") with self.indented(): - arg_expr_list = [] + arg_expr_list_async = [] for _ in range(num_args): arg_lines = self.arg_generator.create_simple_argument() - arg_expr_list.append(" ".join(arg_lines)) + arg_expr_list_async.append(" ".join(arg_lines)) - args_tuple_str = f"({', '.join(arg_expr_list)}{',' if len(arg_expr_list) == 1 and num_args == 1 else ''})" + args_str_async = ", ".join(arg_expr_list_async) - self.write( - 0, - f"thread_obj = Thread(target=target_func, args={args_tuple_str}, name='{prefix}_{callable_name}')", - ) - self.write(0, "fuzzer_threads_alive.append(thread_obj)") - self.write(0, "except Exception as e_thread_create:") + self.write(0, f"target_func({args_str_async})") + self.write(0, "except Exception as e_async_call:") self.write_print_to_stderr( 1, - f'f"[{prefix}] Failed to create thread for {callable_name}: {{e_thread_create.__class__.__name__}}"', + f'f"[{prefix}] Exception in async task {async_func_name}: {{e_async_call.__class__.__name__}} {{e_async_call}}"', ) - self.emptyLine() - - if self.enable_async: - async_func_name = f"async_call_{prefix}_{callable_name}" - self.write(0, "if target_func is not None:") - with self.indented(): - self.write(0, f"def {async_func_name}(target_func=target_func):") - with self.indented(): - self.write_print_to_stderr(0, f'"Starting async task: {async_func_name}"') - self.write(0, f"time.sleep({random() / 1000:.6f}) # Small delay") - self.write(0, "try:") - with self.indented(): - arg_expr_list_async = [] - for _ in range(num_args): - arg_lines = self.arg_generator.create_simple_argument() - arg_expr_list_async.append(" ".join(arg_lines)) - - args_str_async = ", ".join(arg_expr_list_async) - - self.write(0, f"target_func({args_str_async})") - self.write(0, "except Exception as e_async_call:") - self.write_print_to_stderr( - 1, - f'f"[{prefix}] Exception in async task {async_func_name}: {{e_async_call.__class__.__name__}} {{e_async_call}}"', - ) - self.write_print_to_stderr(0, f'"Ending async task: {async_func_name}"') - self.write(0, f"fuzzer_async_tasks.append({async_func_name})") - self.emptyLine() + self.write_print_to_stderr(0, f'"Ending async task: {async_func_name}"') + self.write(0, f"fuzzer_async_tasks.append({async_func_name})") + self.emptyLine() def _write_concurrency_finalization(self) -> None: """Writes code to start/join threads and run asyncio tasks.""" if self.enable_threads: - self.write_print_to_stderr(0, '"--- Starting and Joining Fuzzer Threads ---"') - self.write(0, "for t_obj in fuzzer_threads_alive:") - self.write(1, "try:") - self.write_print_to_stderr(2, 'f"Starting thread: {t_obj.name}"') - self.write(2, "t_obj.start()") - self.write(1, "except Exception as e_thread_start:") - self.write_print_to_stderr( - 2, - 'f"Failed to start thread {t_obj.name}: {e_thread_start.__class__.__name__}"', - ) - self.write(0, "for t_obj in fuzzer_threads_alive:") - self.write(1, "try:") - self.write_print_to_stderr(2, 'f"Joining thread: {t_obj.name}"') - self.write(2, "t_obj.join(timeout=1.0) # Add timeout to join") - self.write(1, "except Exception as e_thread_join:") - self.write_print_to_stderr( - 2, - 'f"Failed to join thread {t_obj.name}: {e_thread_join.__class__.__name__}"', + self.write_block( + 0, + """ + print("--- Starting and Joining Fuzzer Threads ---", file=stderr) + for t_obj in fuzzer_threads_alive: + try: + print(f"Starting thread: {t_obj.name}", file=stderr) + t_obj.start() + except Exception as e_thread_start: + print(f"Failed to start thread {t_obj.name}: {e_thread_start.__class__.__name__}", file=stderr) + for t_obj in fuzzer_threads_alive: + try: + print(f"Joining thread: {t_obj.name}", file=stderr) + t_obj.join(timeout=1.0) # Add timeout to join + except Exception as e_thread_join: + print(f"Failed to join thread {t_obj.name}: {e_thread_join.__class__.__name__}", file=stderr) + """, ) self.emptyLine() if self.enable_async: - self.write_print_to_stderr(0, '"--- Running Fuzzer Async Tasks ---"') - self.write(0, "async def main_async_fuzzer_tasks():") - self.write(1, "if not fuzzer_async_tasks: return") - self.write( - 1, - "task_objects = [asyncio.to_thread(func) for func in fuzzer_async_tasks]", - ) - self.write(1, "await asyncio.gather(*task_objects, return_exceptions=True)") - self.emptyLine() - self.write(0, "runner = asyncio.Runner()") - self.write(0, "try:") - self.write(1, "runner.run(main_async_fuzzer_tasks())") - self.write(0, "except Exception as e_async_runner_run:") - self.write( - 1, - "print(f'Exception in async runner: {e_async_runner_run.__class__.__name__} {e_async_runner_run}')", + self.write_block( + 0, + """ + print("--- Running Fuzzer Async Tasks ---", file=stderr) + async def main_async_fuzzer_tasks(): + if not fuzzer_async_tasks: return + task_objects = [asyncio.to_thread(func) for func in fuzzer_async_tasks] + await asyncio.gather(*task_objects, return_exceptions=True) + + runner = asyncio.Runner() + try: + runner.run(main_async_fuzzer_tasks()) + except Exception as e_async_runner_run: + print(f'Exception in async runner: {e_async_runner_run.__class__.__name__} {e_async_runner_run}') + finally: + runner.close() + """, ) - self.write(0, "finally:") - self.write(1, "runner.close()") self.emptyLine() def generate_fuzzing_script(self) -> None: