diff --git a/fusil/python/write_python_code.py b/fusil/python/write_python_code.py index 8034658..ca4a152 100644 --- a/fusil/python/write_python_code.py +++ b/fusil/python/write_python_code.py @@ -733,25 +733,24 @@ def _fuzz_one_class(self, class_idx: int, class_name_str: str, class_type: type) num_constructor_args = class_arg_number(class_name_str, class_type) self.write(0, f"{instance_var_name} = None # Initialize instance variable") + # PoC of the indentation context manager: `with self.indented():` replaces the + # addLevel(1)/restoreLevel(self.base_level - 1) bookkeeping -- the block's nesting now + # mirrors the generated code's nesting and the level can't leak. Output is unchanged + # (guarded by tests/python/test_golden_output.py). self.write(0, "try:") - self.addLevel(1) - self.write( - 0, f"{instance_var_name} = callFunc('{prefix}_init', '{class_name_str}'," - ) # prefix was from original _fuzz_one_class - self._write_arguments_for_call_lines(num_constructor_args, 1) # Indent args by 1 - self.write(0, " )") # Close callFunc - self.restoreLevel(self.base_level - 1) # Exit try's indentation (level 1) + with self.indented(): + self.write(0, f"{instance_var_name} = callFunc('{prefix}_init', '{class_name_str}',") + self._write_arguments_for_call_lines(num_constructor_args, 1) # Indent args by 1 + self.write(0, " )") # Close callFunc self.write(0, "except Exception as e_instantiate:") - self.addLevel(1) # Indent for except block contents - self.write(0, f"{instance_var_name} = None") - self.write_print_to_stderr( - 0, # This 0 is relative to current base_level (which is parent's level + 1) - f'"[{prefix}] Failed to instantiate {class_name_str}: {{e_instantiate.__class__.__name__}} {{e_instantiate}}"', - ) - # instance_var_name remains None if already set, or if callFunc returned None - # If callFunc might not set instance_var_name on error, set it explicitly: - self.write(0, f"{instance_var_name} = None") - self.restoreLevel(self.base_level - 1) # Exit except's indentation + with self.indented(): + self.write(0, f"{instance_var_name} = None") + self.write_print_to_stderr( + 0, + f'"[{prefix}] Failed to instantiate {class_name_str}: {{e_instantiate.__class__.__name__}} {{e_instantiate}}"', + ) + # callFunc may not set instance_var_name on error, so set it explicitly. + self.write(0, f"{instance_var_name} = None") self.emptyLine() self._dispatch_fuzz_on_instance( @@ -765,21 +764,20 @@ def _fuzz_one_class(self, class_idx: int, class_name_str: str, class_type: type) self.write( 0, f"if {instance_var_name} is not None and {instance_var_name} is not SENTINEL_VALUE:" ) - current_level = self.addLevel(1) - # class_type is the type object of the class that was "instantiated" - self._fuzz_methods_on_object_or_specific_types( - current_prefix=f"{prefix}m", # prefix from _fuzz_one_class context, e.g., "o1m", "c1m" - target_obj_expr_str=instance_var_name, - target_obj_class_name=class_name_str, # Original class name string - target_obj_actual_type_obj=class_type, # The actual type object - num_method_calls_to_make=self.options.methods_number, - ) - self.write(0, f"del {instance_var_name} # Cleanup instance") - self.write_print_to_stderr( - 0, f'"[{prefix}] -explicit garbage collection for class instance-"' - ) - self.write(0, "collect()") - self.restoreLevel(current_level) + with self.indented(): + # class_type is the type object of the class that was "instantiated" + self._fuzz_methods_on_object_or_specific_types( + current_prefix=f"{prefix}m", # e.g. "o1m", "c1m" + target_obj_expr_str=instance_var_name, + target_obj_class_name=class_name_str, # Original class name string + target_obj_actual_type_obj=class_type, # The actual type object + num_method_calls_to_make=self.options.methods_number, + ) + self.write(0, f"del {instance_var_name} # Cleanup instance") + self.write_print_to_stderr( + 0, f'"[{prefix}] -explicit garbage collection for class instance-"' + ) + self.write(0, "collect()") self.emptyLine() MAX_FUZZ_GENERATION_DEPTH = 2 # Adjust as needed; 3-5 is usually a good start diff --git a/fusil/write_code.py b/fusil/write_code.py index 112aaa7..10f6ba5 100644 --- a/fusil/write_code.py +++ b/fusil/write_code.py @@ -1,5 +1,6 @@ import re import textwrap +from contextlib import contextmanager from os import chmod from textwrap import dedent @@ -76,6 +77,22 @@ def restoreLevel(self, level): raise ValueError("Negative indentation level in restoreLevel()") self.base_level = level + @contextmanager + def indented(self, delta=1): + """Scoped indentation: emit lines `delta` levels deeper inside the block, then + restore the previous level on exit (including on exception). + + Replaces the manual ``saved = self.addLevel(1); ...; self.restoreLevel(saved)`` + bookkeeping (and the error-prone ``restoreLevel(self.base_level - 1)`` form) with a + block whose nesting matches the generated code's nesting and can't leak a level. + Named ``indented`` (not ``indent``) because ``self.indent`` is the indent string. + """ + saved = self.addLevel(delta) + try: + yield + finally: + self.restoreLevel(saved) + def indentLine(self, level, text): if not isinstance(text, str): text = str(text, "ASCII") diff --git a/pyproject.toml b/pyproject.toml index 00bb742..0e5b26f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,8 +60,15 @@ namespaces = false [tool.ruff] line-length = 100 target-version = "py313" -# Legacy / parked code is not maintained to the lint/format standard. -extend-exclude = ["fusil/notworking", "fuzzers/notworking", "tools/notworking"] +# Legacy / parked code is not maintained to the lint/format standard; the golden/ +# directory holds generated fuzzer output (snapshots) that must stay byte-for-byte as +# emitted -- linting/formatting it is meaningless and would break the snapshots. +extend-exclude = [ + "fusil/notworking", + "fuzzers/notworking", + "tools/notworking", + "tests/python/golden", +] [tool.ruff.lint] # Default rule set (pyflakes F + a subset of pycodestyle E) plus import sorting (I). diff --git a/tests/python/golden/fakemod_seed1234.py b/tests/python/golden/fakemod_seed1234.py new file mode 100644 index 0000000..6a78b54 --- /dev/null +++ b/tests/python/golden/fakemod_seed1234.py @@ -0,0 +1,543 @@ +# FUSIL_BOILERPLATE_START + +from gc import collect +from random import choice, randint, random, sample, seed +from sys import stderr, path as sys_path +from os.path import dirname +import ast +import inspect +import io +import math +import operator +import time +import sys +from threading import Thread +from unittest.mock import MagicMock +import asyncio +seed(946466063) + +print("Importing target module: fakemod", file=stderr) +import fakemod + +TRIVIAL_TYPES = {int, str, float, bool, bytes, tuple, list, dict, set, type(None),} +def skip_trivial_type(obj_instance_or_class): + if type(obj_instance_or_class) in TRIVIAL_TYPES: + return True + return False + + +import sys +from _collections import OrderedDict, deque +from abc import ABCMeta +from collections import Counter +from queue import Queue +from random import randint +from string import printable + +try: + from _decimal import Decimal + + has__decimal = True +except ImportError: + from decimal import Decimal + + has__decimal = False + +sequences = [Queue, deque, frozenset, list, set, str, tuple] +bytes_ = [bytearray, bytes] +numbers = [Decimal, complex, float, int] +dicts = [Counter, OrderedDict, dict] +# dicts = [OrderedDict, dict] +bases = sequences + bytes_ + numbers + dicts + [object] + +large_num = 2**64 + + +class WeirdBase(ABCMeta): + def __hash__(self): + return randint(0, large_num) + + def __eq__(self, other): + return False + + +weird_instances = dict() +weird_classes = dict() +for cls in bases: + + class weird_cls(cls, metaclass=WeirdBase): + def add(self, *args, **kwargs): + pass + + append = clear = close = write = sort = reversed = add + + def encode(self, *args, **kwargs): + return b"" + + def decode(self, *args, **kwargs): + return "" + + format = getvalue = join = read = replace = strip = rstrip = decode + + def get(self, *args, **kwargs): + return self + + open = pop = update = get + + def readlines(self, *args, **kwargs): + return [""] + + rsplit = split = partition = rpartition = readlines + + def items(self): + return {}.items() + + def keys(self): + return {}.keys() + + def values(self): + return {}.values() + + weird_cls.__name__ = f"weird_{cls.__name__}" + weird_instances[f"weird_{cls.__name__}_empty"] = weird_cls() + weird_classes[f"weird_{cls.__name__}"] = weird_cls + +tricky_strs = ( + chr(0), + chr(127), + chr(255), + chr(0x10FFFF), + "𝒜", + "\\x00" * 10, + "A" * (2**16), + "💻" * 2**10, +) + +# We cannot create a Decimal larger than 10 ** 4300 with _pydecimal, only with _decimal +max_str_digits_adjustment = 1 if has__decimal else -1 +big_int_for_decimal = 10 ** (sys.int_info.default_max_str_digits + max_str_digits_adjustment) + +for cls in sequences: + weird_instances[f"weird_{cls.__name__}_single"] = weird_classes[f"weird_{cls.__name__}"]("a") + weird_instances[f"weird_{cls.__name__}_range"] = weird_classes[f"weird_{cls.__name__}"]( + range(20) + ) + weird_instances[f"weird_{cls.__name__}_types"] = weird_classes[f"weird_{cls.__name__}"](bases) + weird_instances[f"weird_{cls.__name__}_printable"] = weird_classes[f"weird_{cls.__name__}"]( + printable + ) + weird_instances[f"weird_{cls.__name__}_special"] = weird_classes[f"weird_{cls.__name__}"]( + tricky_strs + ) +for cls in bytes_: + weird_instances[f"weird_{cls.__name__}_bytes"] = weird_classes[f"weird_{cls.__name__}"]( + b"abcdefgh_" * 10 + ) +for cls in numbers: + weird_instances[f"weird_{cls.__name__}_sys_maxsize"] = weird_classes[f"weird_{cls.__name__}"]( + sys.maxsize + ) + weird_instances[f"weird_{cls.__name__}_sys_maxsize_minus_one"] = weird_classes[ + f"weird_{cls.__name__}" + ](sys.maxsize - 1) + weird_instances[f"weird_{cls.__name__}_sys_maxsize_plus_one"] = weird_classes[ + f"weird_{cls.__name__}" + ](sys.maxsize + 1) + weird_instances[f"weird_{cls.__name__}_neg_sys_maxsize"] = weird_classes[ + f"weird_{cls.__name__}" + ](-sys.maxsize) + weird_instances[f"weird_{cls.__name__}_2**63-1"] = weird_classes[f"weird_{cls.__name__}"]( + 2**63 - 1 + ) + weird_instances[f"weird_{cls.__name__}_2**63"] = weird_classes[f"weird_{cls.__name__}"](2**63) + weird_instances[f"weird_{cls.__name__}_2**63+1"] = weird_classes[f"weird_{cls.__name__}"]( + 2**63 + 1 + ) + weird_instances[f"weird_{cls.__name__}_-2**63+1"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**63) + 1 + ) + weird_instances[f"weird_{cls.__name__}_-2**63"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**63) + ) + weird_instances[f"weird_{cls.__name__}_-2**63-1"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**63) - 1 + ) + weird_instances[f"weird_{cls.__name__}_2**31-1"] = weird_classes[f"weird_{cls.__name__}"]( + 2**31 - 1 + ) + weird_instances[f"weird_{cls.__name__}_2**31"] = weird_classes[f"weird_{cls.__name__}"](2**31) + weird_instances[f"weird_{cls.__name__}_2**31+1"] = weird_classes[f"weird_{cls.__name__}"]( + 2**31 + 1 + ) + weird_instances[f"weird_{cls.__name__}_-2**31+1"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**31) + 1 + ) + weird_instances[f"weird_{cls.__name__}_-2**31"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**31) + ) + weird_instances[f"weird_{cls.__name__}_-2**31-1"] = weird_classes[f"weird_{cls.__name__}"]( + -(2**31) - 1 + ) + if cls not in (float, complex) and hasattr(sys, "int_info"): + weird_instances[f"weird_{cls.__name__}_10**default_max_str_digits+1"] = weird_classes[ + f"weird_{cls.__name__}" + ](big_int_for_decimal) +for cls in dicts: + weird_instances[f"weird_{cls.__name__}_basic"] = weird_classes[f"weird_{cls.__name__}"]( + {a: a for a in range(100)} + ) + weird_instances[f"weird_{cls.__name__}_tricky_strs"] = weird_classes[f"weird_{cls.__name__}"]( + {a: a for a in tricky_strs} + ) + + +# Class with a __del__ side effect to attack the JIT optimizer +class FrameModifier: + def __init__(self, var_name, new_value): + # Store the name of the variable to target and its new value. + self.var_name = var_name + self.new_value = new_value + # Announce creation for debugging the generated script + print(f" [FrameModifier created to target '{self.var_name}']", file=sys.stderr) + + def __del__(self): + try: + # On destruction, get the calling frame (1 level up). + frame = sys._getframe(1) + # Maliciously modify the local variable in that frame. + print( + f" [Side Effect] In __del__: Modifying '{self.var_name}' to {self.new_value!r}", + file=sys.stderr, + ) + if self.var_name in frame.f_locals: + frame.f_locals[self.var_name] = self.new_value + elif ( + self.var_name.split(".")[0] in frame.f_locals and self.var_name.count(".") == 1 + ): # instance_or_class.attribute + instance_or_class_str, attr_str = self.var_name.split(".") + setattr(frame.f_locals[instance_or_class_str], attr_str, self.new_value) + else: # module.instance_or_class.attribute + module_str, instance_or_class_str, attr_str = self.var_name.split(".") + instance_or_class = getattr(frame.f_locals[module_str], instance_or_class_str) + setattr(instance_or_class, attr_str, self.new_value) + except Exception as e: + # Frame inspection can be tricky; don't crash in __del__. + print(f" [Side Effect] Error in FrameModifier.__del__: {e}", file=sys.stderr) + + +import abc +import builtins +import collections.abc +import itertools +import types +import typing +from functools import reduce +from operator import or_ + +abc_types = [cls for cls in abc.__dict__.values() if isinstance(cls, type)] +builtins_types = [cls for cls in builtins.__dict__.values() if isinstance(cls, type)] +collections_abc_types = [cls for cls in collections.abc.__dict__.values() if isinstance(cls, type)] +collections_types = [cls for cls in collections.__dict__.values() if isinstance(cls, type)] +itertools_types = [cls for cls in itertools.__dict__.values() if isinstance(cls, type)] +types_types = [cls for cls in types.__dict__.values() if isinstance(cls, type)] +typing_types = [cls for cls in typing.__dict__.values() if isinstance(cls, type)] + +all_types = ( + abc_types + + builtins_types + + collections_abc_types + + collections_types + + itertools_types + + types_types + + typing_types +) +all_types = [t for t in all_types if not (isinstance(t, type) and issubclass(t, BaseException))] +big_union = reduce(or_, all_types, int) + + +import inspect +import types + +tricky_cell = types.CellType(None) +tricky_simplenamespace = types.SimpleNamespace(dummy=None, cell=tricky_cell) +tricky_simplenamespace.dummy = tricky_simplenamespace +tricky_capsule = types.CapsuleType +tricky_module = types.ModuleType("tricky_module", "docs") +tricky_module2 = types.ModuleType("tricky_module2\\x00", "docs\\x00") +try: + tricky_genericalias = types.GenericAlias(list, (int,)) +except AttributeError: + tricky_genericalias = None + +tricky_dict = {} +if tricky_capsule: + tricky_dict[tricky_capsule] = tricky_cell +if tricky_module: + tricky_dict[tricky_module] = tricky_genericalias +tricky_dict["tricky_dict"] = tricky_dict +tricky_mappingproxy = types.MappingProxyType(tricky_dict) + + +def tricky_function(*args, **kwargs): + if len(args) > 150: + raise RecursionError("Fuzzer controlled depth") + a = 1 + + def b(x=a): + v = x + return v + + return tricky_function(*(args + (1,)), **kwargs) + + +tricky_lambda = lambda *args, **kwargs: tricky_lambda(*args, **kwargs) +tricky_classmethod = classmethod(tricky_lambda) +tricky_staticmethod = staticmethod(tricky_lambda) +tricky_property = property(tricky_lambda) +tricky_code = tricky_lambda.__code__ +tricky_closure = tricky_function.__code__.co_freevars +tricky_classmethod_descriptor = types.ClassMethodDescriptorType # This is the type itself + + +class TrickyDescriptor: + def __get__(self, obj, objtype=None): + return self + + def __set__(self, obj, value): + try: + obj.__dict__["_value_descriptor"] = value + except AttributeError: + pass + + def __delete__(self, obj): + try: + del obj.__dict__["_value_descriptor"] + except (AttributeError, KeyError): + pass + + +class TrickyMeta(type): + @property + def __signature__(self): + raise AttributeError("Signature denied by TrickyMeta") + + def __mro_entries__(self, bases): + return (object,) + # return super().__mro_entries__(bases) + + +class TrickyClass(metaclass=TrickyMeta): + tricky_descriptor = TrickyDescriptor() + + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, *args, **kwargs): + self._value_init = None + + def __getattr__(self, name): + if name == "crash_on_getattr": + raise ValueError("getattr manipulated") + return self + + +tricky_instance = TrickyClass() +try: + tricky_frame = inspect.currentframe() + if tricky_frame: # currentframe() can be None + # tricky_frame.f_builtins.update(tricky_dict) + tricky_frame.f_globals.update(tricky_dict) + tricky_frame.f_locals.update(tricky_dict) +except RuntimeError: + tricky_frame = None + + +try: + 1 / 0 +except ZeroDivisionError as e: + tricky_traceback = e.__traceback__ +else: + tricky_traceback = None + + +# tricky_generator = (x for x in itertools.count()) +tricky_list_with_cycle = [[]] * 6 + [] +tricky_list_with_cycle[0].append(tricky_list_with_cycle) +tricky_list_with_cycle[-1].append(tricky_list_with_cycle) +tricky_list_with_cycle.append(tricky_list_with_cycle) +if tricky_list_with_cycle[0] and tricky_list_with_cycle[0][0] is tricky_list_with_cycle: + tricky_list_with_cycle[0][0].append(tricky_list_with_cycle) + + +def errback(*args, **kw): + raise ValueError('errback called') + + +class Liar1: + def __eq__(self, other): + return True + +class Liar2: + def __eq__(self, other): + return False + +liar1, liar2 = Liar1(), Liar2() + +class Evil: + def __eq__(self, other): + for attr in dir(other): + try: other.__dict__[attr] = errback + except: pass + +evil = Evil() + + +# Define a custom exception to distinguish our check from others. +class JITCorrectnessError(AssertionError): pass + +# Helper for correctness testing that handles NaN, lambdas, and complex numbers. +import math +import types +def compare_results(a, b): + if isinstance(a, types.FunctionType) and a.__name__ == '' and \ + isinstance(b, types.FunctionType) and b.__name__ == '': + return True # Treat two lambdas as equal for our purposes + if isinstance(a, complex) and isinstance(b, complex): + a_real_nan = math.isnan(a.real) + b_real_nan = math.isnan(b.real) + a_imag_nan = math.isnan(a.imag) + b_imag_nan = math.isnan(b.imag) + real_match = (a.real == b.real) or (a_real_nan and b_real_nan) + imag_match = (a.imag == b.imag) or (a_imag_nan and b_imag_nan) + return real_match and imag_match + if isinstance(a, float) and isinstance(b, float) and math.isnan(a) and math.isnan(b): + return True + if isinstance(a, object) and isinstance(b, object): + return True + if isinstance(a, tuple) and isinstance(b, tuple) and len(a) == len(b): + return all(compare_results(x, y) for x, y in zip(a, b)) + return a == b + +SENTINEL_VALUE = object() + +def callMethod(prefix, obj_to_call, method_name, *arguments, verbose=True): + func_display_name = f"fakemod.{method_name}()" if obj_to_call is fakemod else f"{obj_to_call.__class__.__name__}.{method_name}()" + message = f"[{prefix}] {func_display_name}" + if verbose: + print(message, file=stderr) + result = SENTINEL_VALUE + try: + func_to_run = getattr(obj_to_call, method_name) + for _ in range(int(3)): + result = func_to_run(*arguments) + except (Exception, SystemExit, KeyboardInterrupt) as err: + try: + errmsg = repr(err) + except Exception as e_repr: + errmsg = f'Error during repr: {e_repr.__class__.__name__}' + errmsg = errmsg.encode('ASCII', 'replace').decode('ASCII') + if verbose: + print(f"[{prefix}] {func_display_name} => EXCEPTION: {err.__class__.__name__}: {errmsg}", file=stderr) + result = SENTINEL_VALUE + if verbose: + print(f"[{prefix}] -explicit garbage collection-", file=stderr) + collect() + return result + +def callFunc(prefix, func_name_str, *arguments, verbose=True): + return callMethod(prefix, fakemod, func_name_str, *arguments, verbose=verbose) + +fuzz_target_module = fakemod + + + +# FUSIL_BOILERPLATE_END + + +import sys +from random import choice, randint, random, sample +from sys import stderr, path as sys_path + + +print("--- Fuzzing 2 functions in fakemod ---", file=stderr) +res_f1 = callFunc("f1", "func_a", +verbose=True) + + +res_f2 = callFunc("f2", "func_a", +verbose=True) + + +res_f3 = callFunc("f3", "func_a", + None, +verbose=True) + + + +print("--- Fuzzing 1 classes in fakemod ---", file=stderr) +print("[c1] Attempting to instantiate class: Widget", file=stderr) +instance_c1_widget = None # Initialize instance variable +try: + instance_c1_widget = callFunc('c1_init', 'Widget', + ) +except Exception as e_instantiate: + instance_c1_widget = None + print("[c1] Failed to instantiate Widget: {e_instantiate.__class__.__name__} {e_instantiate}", file=stderr) + instance_c1_widget = None + +try: + print(f"--- (Depth 0) Dispatching Fuzz for: { instance_c1_widget!r } (hint: Widget, prefix: c1_widget_ops) ---", file=stderr) +except Exception as e: + print(f"--- (Depth 0) Error calling repr() prefix: c1_widget_ops) ---", file=stderr) +# self.base_level=0 +if instance_c1_widget is not None: + # self.base_level=1 + # L_main_if_target_not_none=0 + if skip_trivial_type(instance_c1_widget): + print(f'Skipping deep diving on instance_c1_widget {type(instance_c1_widget)}', file=stderr) + try: + print(f'Instance { instance_c1_widget!r } (actual type {type(instance_c1_widget).__name__}) has no specific fuzzer type, doing generic calls.', file=stderr) + except Exception as e: + print(f'Error printing instance repr() { e } (actual type {type(instance_c1_widget).__name__}) has no specific fuzzer type, doing generic calls.', file=stderr) + if skip_trivial_type(instance_c1_widget): + print(f'Skipping deep diving on instance_c1_widget {type(instance_c1_widget)}', file=stderr) + else: + print(f'Instance instance_c1_widget (type {type(instance_c1_widget).__name__}) has no specific fuzzer, doing generic calls.', file=stderr) + c1_widget_ops_generic_methods = [] + try: + for c1_widget_ops_generic_attr_name in dir(instance_c1_widget): + if c1_widget_ops_generic_attr_name.startswith('_'): continue + try: + c1_widget_ops_generic_attr_val = getattr(instance_c1_widget, c1_widget_ops_generic_attr_name) + if callable(c1_widget_ops_generic_attr_val) and not c1_widget_ops_generic_attr_val.__name__ in ('wait', '_rehash'): c1_widget_ops_generic_methods.append((c1_widget_ops_generic_attr_name, c1_widget_ops_generic_attr_val)) + except Exception: pass + except Exception: c1_widget_ops_generic_methods = [] # Failed to get methods + if c1_widget_ops_generic_methods: + print(f'Found {len(c1_widget_ops_generic_methods)} callable methods for generic fuzzing of instance_c1_widget', file=stderr) + for _i_c1_widget_ops_generic in range(min(len(c1_widget_ops_generic_methods), 2)): + c1_widget_ops_generic_method_name_to_call, c1_widget_ops_generic_method_obj_to_call = choice(c1_widget_ops_generic_methods) + # Conceptual call to generic method fuzzer + if c1_widget_ops_generic_method_name_to_call not in ('wait', '_rehash'): callMethod(f'c1_widget_ops_generic_gen{_i_c1_widget_ops_generic}', instance_c1_widget, c1_widget_ops_generic_method_name_to_call) + +if instance_c1_widget is not None and instance_c1_widget is not SENTINEL_VALUE: + print(f"--- Fuzzing instance: instance_c1_widget (type hint: Widget, prefix: c1m) ---", file=stderr) + if skip_trivial_type(instance_c1_widget): + print(f'Skipping deep diving on instance_c1_widget {type(instance_c1_widget)}', file=stderr) + # General method fuzzing for instance_c1_widget + res_c1m1 = callMethod("c1m1", instance_c1_widget, "method_two", + list[weird_classes['weird_OrderedDict']] | weird_classes['weird_set'] | big_union, + verbose=True) + + + res_c1m2 = callMethod("c1m2", instance_c1_widget, "method_one", + verbose=True) + + + print(f"--- Finished fuzzing instance: instance_c1_widget ---", file=stderr) + + del instance_c1_widget # Cleanup instance + print("[c1] -explicit garbage collection for class instance-", file=stderr) + collect() + + + diff --git a/tests/python/test_golden_output.py b/tests/python/test_golden_output.py new file mode 100644 index 0000000..4bb32f3 --- /dev/null +++ b/tests/python/test_golden_output.py @@ -0,0 +1,181 @@ +"""Golden-output tests for the code generator. + +``WritePythonCode`` is a pure function of (RNG seed, options, target module), so we can +pin all three and snapshot the generated ``source.py``. This is the safety net for +refactors of the generator's construction style (e.g. the indentation API -> ``with +self.indent():`` migration): a behaviour-preserving change must leave the output +byte-identical. + +Determinism is engineered, not assumed: +- a fixed ``random.seed`` before each generation, +- a fully-pinned options object (concrete values -- no MagicMock, whose repr embeds a + non-deterministic id that would leak into output), +- a fake target module with a fixed, public-only surface (``test_private=False`` makes the + generator skip all dunders, so method introspection doesn't drift across Python versions), +- ``no_numpy``/``no_tstrings`` so output doesn't depend on whether numpy/h5py are installed. + +To regenerate the committed snapshot after an *intentional* generator change: + python -m tests.python.test_golden_output --update +""" + +import ast +import os +import pathlib +import random +import sys +import tempfile +import types +import unittest + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_ROOT = os.path.join(SCRIPT_DIR, "..", "..") +sys.path.insert(0, PROJECT_ROOT) + +from fusil.python.write_python_code import WritePythonCode + +GOLDEN_DIR = pathlib.Path(SCRIPT_DIR) / "golden" +GOLDEN_FILE = GOLDEN_DIR / "fakemod_seed1234.py" +SEED = 1234 + + +class _Options: + """A fully-pinned options stand-in (every attribute the generator reads).""" + + functions_number = 3 + classes_number = 1 + objects_number = 0 + methods_number = 2 + deep_dive = False + fuzz_exceptions = False + test_private = False + no_numpy = True + no_tstrings = True + external_references = True + oom_fuzz = False + oom_max_start = 50 + oom_calls = 3 + oom_classes = 0 + oom_methods = 0 + oom_verbose = False + oom_seq = False + oom_seq_len = 3 + oom_window = 1 + + +class _Parent: + """Minimal stand-in for the PythonSource parent the writer reads from.""" + + def __init__(self): + self.options = _Options() + self.filenames = ["/tmp/fuzz_fixture"] + + def warning(self, *args, **kwargs): + pass + + +def _fake_module(): + """A target module with a fixed, public-only surface (version-stable introspection).""" + module = types.ModuleType("fakemod") + + def func_a(x=0): + return x + + def func_b(a, b): + return a + b + + class Widget: + def method_one(self): + return 1 + + def method_two(self, n): + return n + + module.func_a = func_a + module.func_b = func_b + module.Widget = Widget + module.CONST = 42 # trivial type -> skipped by the member filter + return module + + +def generate(seed=SEED): + """Generate the fuzzing script for the fake module; deterministic for a given seed.""" + random.seed(seed) + parent = _Parent() + fd, path = tempfile.mkstemp(suffix="_golden.py") + os.close(fd) + try: + writer = WritePythonCode( + parent, + path, + _fake_module(), + "fakemod", + threads=False, + _async=False, + plugin_manager=None, + ) + writer.generate_fuzzing_script() + return pathlib.Path(path).read_text(encoding="utf-8") + finally: + os.unlink(path) + + +# The exact block emitted by the class-instantiation method under refactor +# (_fuzz_one_class). Version-independent (pure generator text + the fake class name), so it +# guards the indentation-API -> context-manager conversion on every interpreter, not just +# the canonical one the full snapshot is pinned to. +EXPECTED_CLASS_INSTANTIATION_BLOCK = """\ +instance_c1_widget = None # Initialize instance variable +try: + instance_c1_widget = callFunc('c1_init', 'Widget', + ) +except Exception as e_instantiate: + instance_c1_widget = None + print("[c1] Failed to instantiate Widget: {e_instantiate.__class__.__name__} {e_instantiate}", file=stderr) + instance_c1_widget = None +""" + + +class TestGoldenOutput(unittest.TestCase): + def test_generation_is_deterministic_and_valid(self): + """Same seed -> byte-identical output, and the output is valid Python.""" + first = generate() + second = generate() + self.assertEqual(first, second, "generation is not deterministic for a fixed seed") + ast.parse(first) + + def test_class_instantiation_block_unchanged(self): + """The class-instantiation block (the refactored method) is emitted verbatim.""" + out = generate() + self.assertIn(EXPECTED_CLASS_INSTANTIATION_BLOCK, out) + + @unittest.skipUnless( + sys.version_info[:2] >= (3, 14), + "full-script snapshot is pinned to the canonical interpreter (>=3.14); the " + "determinism + block tests cover earlier versions", + ) + def test_full_script_matches_snapshot(self): + """The whole generated script matches the committed golden snapshot.""" + self.assertTrue( + GOLDEN_FILE.is_file(), + f"missing golden snapshot {GOLDEN_FILE}; regenerate with --update", + ) + expected = GOLDEN_FILE.read_text(encoding="utf-8") + self.assertEqual( + generate(), + expected, + "generated output drifted from the golden snapshot; if intentional, " + "regenerate with: python -m tests.python.test_golden_output --update", + ) + + +def _update_snapshot(): + GOLDEN_DIR.mkdir(parents=True, exist_ok=True) + GOLDEN_FILE.write_text(generate(), encoding="utf-8") + print(f"wrote {GOLDEN_FILE}") + + +if __name__ == "__main__": + if "--update" in sys.argv: + _update_snapshot() + else: + unittest.main() diff --git a/tests/test_write_code.py b/tests/test_write_code.py new file mode 100644 index 0000000..ba52195 --- /dev/null +++ b/tests/test_write_code.py @@ -0,0 +1,69 @@ +"""Tests for the WriteCode base (the code-generation primitives).""" + +import io +import unittest + +from fusil.write_code import WriteCode + + +class TestIndentedContextManager(unittest.TestCase): + """`with self.indented():` -- the enabling change for the generator refactor.""" + + def _writer(self): + writer = WriteCode() + writer.useStream(io.StringIO()) + return writer + + def test_indents_body_and_restores_level(self): + writer = self._writer() + writer.write(0, "a") + with writer.indented(): + writer.write(0, "b") + writer.write(0, "c") + self.assertEqual(writer.output.getvalue(), "a\n b\nc\n") + self.assertEqual(writer.base_level, 0) + + def test_restores_level_on_exception(self): + writer = self._writer() + with self.assertRaises(ValueError): + with writer.indented(): + self.assertEqual(writer.base_level, 1) + raise ValueError("boom") + self.assertEqual(writer.base_level, 0) + + def test_nesting(self): + writer = self._writer() + with writer.indented(): + with writer.indented(): + writer.write(0, "deep") + self.assertEqual(writer.output.getvalue(), " deep\n") + self.assertEqual(writer.base_level, 0) + + def test_delta(self): + writer = self._writer() + with writer.indented(2): + writer.write(0, "x") + self.assertEqual(writer.output.getvalue(), " x\n") + + def test_equivalent_to_manual_addlevel_restorelevel(self): + """The context manager must emit exactly what the manual dance emitted.""" + manual = WriteCode() + manual.useStream(io.StringIO()) + manual.write(0, "head") + saved = manual.addLevel(1) + manual.write(0, "body") + manual.restoreLevel(saved) + manual.write(0, "tail") + + cm = WriteCode() + cm.useStream(io.StringIO()) + cm.write(0, "head") + with cm.indented(): + cm.write(0, "body") + cm.write(0, "tail") + + self.assertEqual(cm.output.getvalue(), manual.output.getvalue()) + + +if __name__ == "__main__": + unittest.main()