diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py index 64141dc9293..0737f18d62d 100644 --- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py +++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py @@ -20,16 +20,19 @@ # ------------------------------------------------------------- from typing import Dict, List, Tuple, Any, Optional import os -from skopt.space import Real, Integer, Categorical import numpy as np import logging from dataclasses import dataclass import time import copy from joblib import Parallel, delayed -from skopt import Optimizer +import itertools +import math +import random from systemds.scuro.drsearch.representation_dag import ( RepresentationDAGBuilder, + RepresentationDag, + RepresentationNode, ) from systemds.scuro.modality.modality import Modality from systemds.scuro.drsearch.task import PerformanceMeasure @@ -110,13 +113,16 @@ def __init__( tasks, optimization_results, k: int = 2, - n_jobs: int = -1, + n_jobs: int = 1, scoring_metric: str = "accuracy", maximize_metric: bool = True, save_results: bool = False, debug: bool = False, checkpoint_every: Optional[int] = None, resume: bool = True, + random_state: int = 42, + exhaustive_threshold: int = 256, + local_search_patience: int = 3, ): self.tasks = tasks self.unimodal_optimization_results = optimization_results @@ -136,6 +142,10 @@ def __init__( self.logger = logging.getLogger(__name__) self.checkpoint_every = checkpoint_every self.resume = resume + self.random_state = random_state + self.exhaustive_threshold = max(1, exhaustive_threshold) + self.local_search_patience = max(1, local_search_patience) + self._rng = random.Random(self.random_state) self._checkpoint_manager = CheckpointManager( os.getcwd(), "hyperparam_checkpoint_", @@ -209,11 +219,12 @@ def tune_unimodal_representations(self, max_eval_per_rep: Optional[int] = None): self.resume_from_checkpoint() for task in self.tasks: reps = self.k_best_representations[task.model.name] - skip_remaining = self._checkpoint_manager.skip_remaining_by_key.get( - task.model.name, 0 - ) - if skip_remaining >= len(reps): - continue + skip_remaining = 0 + # skip_remaining = self._checkpoint_manager.skip_remaining_by_key.get( + # task.model.name, 0 + # ) + # if skip_remaining >= len(reps): + # continue chunk_size = self.checkpoint_every or len(reps) for start_idx in range(skip_remaining, len(reps), chunk_size): @@ -258,8 +269,9 @@ def visit_node(node_id): visit_node(input_id) visited.add(node_id) if node.operation is not None: - if node.operation().parameters: - hyperparams[node_id] = node.operation().parameters + params = self._get_params_for_node(node) + if params: + hyperparams[node_id] = params reps.append(node.operation) node_order.append(node_id) if node.modality_id is not None: @@ -267,85 +279,39 @@ def visit_node(node_id): visit_node(root_node_id) - if not hyperparams: - return None - start_time = time.time() rep_name = "-".join([rep.__name__ for rep in reps]) - - search_space = [] - param_names = [] - for op_id, op_params in hyperparams.items(): - for param_name, param_values in op_params.items(): - param_names.append(op_id + "-" + param_name) - if isinstance(param_values, list): - search_space.append( - Categorical(param_values, name=op_id + "-" + param_name) - ) - elif isinstance(param_values, tuple) and len(param_values) == 2: - if isinstance(param_values[0], int) and isinstance( - param_values[1], int - ): - search_space.append( - Integer( - param_values[0], - param_values[1], - name=op_id + "-" + param_name, - ) - ) - else: - search_space.append( - Real( - param_values[0], - param_values[1], - name=op_id + "-" + param_name, - ) - ) - else: - search_space.append( - Categorical([param_values], name=op_id + "-" + param_name) - ) - - n_calls = max_evals if max_evals else 50 - - all_results = [] - - def evaluate_point(point): - params = dict(zip(param_names, point)) - result = self.evaluate_dag_config( + modalities_override = ( + self._get_cached_modalities_for_task(task, modality_ids) if mm_opt else None + ) + if not hyperparams: + # TODO: extract the information from the unimodal optimization results + baseline = self.evaluate_dag_config( dag, - params, + {}, node_order, modality_ids, task, - modalities_override=( - self._get_cached_modalities_for_task(task, modality_ids) - if mm_opt - else None - ), + modalities_override=modalities_override, + ) + all_results = [baseline] + else: + n_calls = max_evals if max_evals else 50 + param_specs = self._build_param_specs(hyperparams) + default_config = {} + all_results = self._search_best_configs( + dag=dag, + task=task, + node_order=node_order, + modality_ids=modality_ids, + modalities_override=modalities_override, + param_specs=param_specs, + budget=n_calls, + initial_config=None, ) - score = result[1] - if isinstance(score, PerformanceMeasure): - score = score.average_scores[self.scoring_metric] - if self.maximize_metric: - objective_value = -score - else: - objective_value = score - return objective_value, result - opt = Optimizer( - search_space, random_state=42, n_initial_points=min(10, n_calls // 2) - ) - self.n_jobs = 2 - n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1 - for _ in range(0, n_calls, n_batch): - points = opt.ask(n_points=n_batch) - results = Parallel( - n_jobs=self.n_jobs, max_nbytes=None, mmap_mode=None, backend="threading" - )(delayed(evaluate_point)(p) for p in points) - objective_values = [result[0] for result in results] - all_results.extend(result[1] for result in results) - opt.tell(points, objective_values) + if not all_results: + return None def get_score(result): score = result[1] @@ -359,7 +325,21 @@ def get_score(result): best_params, best_score = min(all_results, key=get_score) tuning_time = time.time() - start_time - + # results = self.unimodal_optimization_results.results[self.modalities[0].modality_id][task.model.name] + + # default_result = sorted( + # results, + # key=lambda r: r.val_score[self.scoring_metric], + # reverse=True, + # )[0] + # pm = PerformanceMeasure(name=self.scoring_metric, metrics=self.scoring_metric, higher_is_better=self.maximize_metric) + # pm.add_scores({self.scoring_metric: default_result.val_score[self.scoring_metric]}) + # default_params = self._get_default_params(dag) + # def_par ={} + # for k, v in default_params.items(): + # for k_v, v_v in v.items(): + # def_par[k+"-"+k_v] = v_v + # all_results.append((def_par, pm)) best_result = HyperparamResult( representation_name=rep_name, best_params=best_params, @@ -374,6 +354,320 @@ def get_score(result): return best_result + def _get_params_for_node(self, node: RepresentationNode) -> Dict[str, Any]: + if not node.operation().parameters: + return None + + params = copy.deepcopy(node.operation().parameters) + return params + + def _build_param_specs( + self, hyperparams: Dict[str, Dict[str, Any]] + ) -> List[Dict[str, Any]]: + param_specs = [] + for op_id, op_params in hyperparams.items(): + for param_name, param_values in op_params.items(): + full_name = op_id + "-" + param_name + if isinstance(param_values, list): + param_type = "categorical" + domain = list(param_values) + elif isinstance(param_values, tuple) and len(param_values) == 2: + lo, hi = param_values + if isinstance(lo, int) and isinstance(hi, int): + param_type = "integer" + else: + param_type = "real" + domain = (lo, hi) + else: + param_type = "categorical" + domain = [param_values] + param_specs.append( + {"name": full_name, "type": param_type, "domain": domain} + ) + return param_specs + + def _config_key(self, params: Dict[str, Any]) -> Tuple[Tuple[str, Any], ...]: + key_items = [] + for name, value in sorted(params.items()): + if isinstance(value, float): + value = round(value, 10) + key_items.append((name, value)) + return tuple(key_items) + + def _score_value(self, score: Any) -> float: + if isinstance(score, PerformanceMeasure): + return score.average_scores.get(self.scoring_metric, np.nan) + return score + + def _is_better(self, candidate_score: float, best_score: float) -> bool: + if np.isnan(candidate_score): + return False + if np.isnan(best_score): + return True + return ( + candidate_score > best_score + if self.maximize_metric + else candidate_score < best_score + ) + + def _sample_random_config( + self, param_specs: List[Dict[str, Any]] + ) -> Dict[str, Any]: + config = {} + for spec in param_specs: + name = spec["name"] + domain = spec["domain"] + if spec["type"] == "categorical": + config[name] = self._rng.choice(domain) + elif spec["type"] == "integer": + config[name] = self._rng.randint(int(domain[0]), int(domain[1])) + else: + config[name] = self._rng.uniform(float(domain[0]), float(domain[1])) + return config + + def _estimate_discrete_search_size( + self, param_specs: List[Dict[str, Any]] + ) -> Optional[int]: + size = 1 + for spec in param_specs: + if spec["type"] == "real": + return None + if spec["type"] == "integer": + size *= max(0, int(spec["domain"][1]) - int(spec["domain"][0]) + 1) + else: + size *= len(spec["domain"]) + if size > self.exhaustive_threshold: + return size + return size + + def _enumerate_configs( + self, param_specs: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + domains = [] + names = [] + for spec in param_specs: + names.append(spec["name"]) + if spec["type"] == "integer": + lo, hi = int(spec["domain"][0]), int(spec["domain"][1]) + domains.append(list(range(lo, hi + 1))) + else: + domains.append(list(spec["domain"])) + return [dict(zip(names, values)) for values in itertools.product(*domains)] + + def _generate_neighbor_config( + self, + base_config: Dict[str, Any], + param_specs: List[Dict[str, Any]], + step_scale: float, + ) -> Dict[str, Any]: + candidate = dict(base_config) + if not param_specs: + return candidate + + n_mutations = 1 if len(param_specs) == 1 else self._rng.randint(1, 2) + mutated_specs = self._rng.sample( + param_specs, k=min(n_mutations, len(param_specs)) + ) + for spec in mutated_specs: + name = spec["name"] + current = candidate[name] + if spec["type"] == "categorical": + values = [value for value in spec["domain"] if value != current] + if values: + candidate[name] = self._rng.choice(values) + elif spec["type"] == "integer": + lo, hi = int(spec["domain"][0]), int(spec["domain"][1]) + width = max(1, hi - lo) + step = max(1, int(math.ceil(width * step_scale))) + delta = self._rng.choice([-step, step]) + candidate[name] = max(lo, min(hi, int(current) + delta)) + else: + lo, hi = float(spec["domain"][0]), float(spec["domain"][1]) + span = max(1e-9, hi - lo) + delta = self._rng.uniform(-span * step_scale, span * step_scale) + candidate[name] = max(lo, min(hi, float(current) + delta)) + return candidate + + def _evaluate_configs( + self, + dag, + task, + node_order, + modality_ids, + modalities_override, + candidate_configs: List[Dict[str, Any]], + seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any], Any]], + ) -> List[Tuple[Dict[str, Any], Any]]: + ordered_unique_configs = [] + unique_keys_in_order = [] + unique_keys_set = set() + for config in candidate_configs: + key = self._config_key(config) + if key not in unique_keys_set: + unique_keys_set.add(key) + unique_keys_in_order.append(key) + ordered_unique_configs.append(config) + + pending_configs = [] + for config in ordered_unique_configs: + key = self._config_key(config) + if key not in seen_configs: + pending_configs.append(config) + + if pending_configs: + n_jobs = self.n_jobs if self.n_jobs != 0 else 1 + evaluated = Parallel( + n_jobs=n_jobs, max_nbytes=None, mmap_mode=None, backend="threading" + )( + delayed(self.evaluate_dag_config)( + dag, + config, + node_order, + modality_ids, + task, + modalities_override=modalities_override, + ) + for config in pending_configs + ) + for result in evaluated: + seen_configs[self._config_key(result[0])] = result + + return [ + seen_configs[key] for key in unique_keys_in_order if key in seen_configs + ] + + def _search_best_configs( + self, + dag, + task, + node_order, + modality_ids, + modalities_override, + param_specs: List[Dict[str, Any]], + budget: int, + initial_config: Dict[str, Any], + ) -> List[Tuple[Dict[str, Any], Any]]: + budget = max(1, budget) + seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any], Any]] = {} + all_results: List[Tuple[Dict[str, Any], Any]] = [] + best_score = np.nan + best_config = None + if initial_config is not None and budget > 0: + initial_results = self._evaluate_configs( + dag, + task, + node_order, + modality_ids, + modalities_override, + [initial_config], + seen_configs, + ) + all_results.extend(initial_results) + if initial_results: + p, s = initial_results[0] + best_config = p + best_score = self._score_value(s) + budget -= 1 + + discrete_size = self._estimate_discrete_search_size(param_specs) + if discrete_size is not None and discrete_size <= min( + self.exhaustive_threshold, budget + ): + candidates = self._enumerate_configs(param_specs) + self._rng.shuffle(candidates) + candidates = candidates[:budget] + batch_results = self._evaluate_configs( + dag, + task, + node_order, + modality_ids, + modalities_override, + candidates, + seen_configs, + ) + all_results.extend(batch_results) + return all_results + + initial_budget = min(budget, max(8, len(param_specs) * 4)) + initial_candidates = [ + self._sample_random_config(param_specs) for _ in range(initial_budget) + ] + initial_results = self._evaluate_configs( + dag, + task, + node_order, + modality_ids, + modalities_override, + initial_candidates, + seen_configs, + ) + all_results.extend(initial_results) + + for params, score in initial_results: + numeric_score = self._score_value(score) + if self._is_better(numeric_score, best_score): + best_score = numeric_score + best_config = params + + eval_count = len(seen_configs) + no_improvement_rounds = 0 + step_scale = 0.5 + + while eval_count < budget: + if best_config is None: + candidate_batch = [self._sample_random_config(param_specs)] + else: + candidate_batch = [] + batch_size = min( + max(2, abs(self.n_jobs) if self.n_jobs != 0 else 1), + budget - eval_count, + ) + for _ in range(batch_size): + candidate_batch.append( + self._generate_neighbor_config( + best_config, param_specs, step_scale + ) + ) + + if budget - eval_count > 3: + candidate_batch.append(self._sample_random_config(param_specs)) + + batch_results = self._evaluate_configs( + dag, + task, + node_order, + modality_ids, + modalities_override, + candidate_batch, + seen_configs, + ) + if not batch_results: + step_scale = max(0.05, step_scale * 0.5) + if step_scale <= 0.05: + break + continue + + improved = False + for params, score in batch_results: + numeric_score = self._score_value(score) + if self._is_better(numeric_score, best_score): + best_score = numeric_score + best_config = params + improved = True + all_results.extend(batch_results) + eval_count = len(seen_configs) + + if improved: + no_improvement_rounds = 0 + step_scale = min(0.5, step_scale * 1.1) + else: + no_improvement_rounds += 1 + step_scale = max(0.05, step_scale * 0.7) + if no_improvement_rounds >= self.local_search_patience: + break + + return all_results + def _get_cached_modalities_for_task(self, task, modality_ids): if not self.k_best_cache_by_modality: return self.get_modalities_by_id(modality_ids) @@ -402,12 +696,13 @@ def evaluate_dag_config( else self.get_modalities_by_id(modality_ids) ) modified_modality = dag_copy.execute(modalities, task) - score = task.run( - modified_modality[list(modified_modality.keys())[-1]].data - )[1] + score = task.run(modified_modality.data)[1] return params, score except Exception as e: + import traceback + + traceback.print_exc() self.logger.error(f"Error evaluating DAG with params {params}: {e}") return params, np.nan @@ -485,11 +780,11 @@ def _get_metric_value(result): self.optimization_results.add_result([result]) self._checkpoint_manager.increment(task.model.name, 1) self._checkpoint_manager.checkpoint_if_due( - self.optimization_results.results, "eval_count_by_task" + self.optimization_results.results ) except Exception: self._checkpoint_manager.save_checkpoint( - self.optimization_results.results, "eval_count_by_task", {} + self.optimization_results.results, {} ) raise if self.save_results: diff --git a/src/main/python/systemds/scuro/drsearch/node_executor.py b/src/main/python/systemds/scuro/drsearch/node_executor.py index bf51dfde857..418b11fe700 100644 --- a/src/main/python/systemds/scuro/drsearch/node_executor.py +++ b/src/main/python/systemds/scuro/drsearch/node_executor.py @@ -271,7 +271,9 @@ def __init__( checkpoint_manager: Optional[CheckpointManager] = None, max_num_workers: int = -1, result_path: Optional[str] = None, + enable_checkpointing: bool = True, ): + self.enable_checkpointing = enable_checkpointing available_total_cpu = ( float(psutil.virtual_memory().available) - float(psutil.virtual_memory().available) * 0.30 @@ -316,14 +318,17 @@ def run(self) -> None: def submit_node(node_id: str): node = self.scheduler.mapping[node_id] gpu_id = node.gpu_id - parent_result = None - parent_node_id = self.scheduler.get_valid_parent(node_id) - if parent_node_id is not None: - parent_result = self.result_cache.get(parent_node_id) + parent_node_ids = self.scheduler.get_valid_parents(node_id) + parent_results = None + if parent_node_ids: + parent_results = [ + self.result_cache.get(parent_node_id) + for parent_node_id in parent_node_ids + ] if self._is_task_node(node): task_result = ResultEntry( dag=self._get_dag_from_node_ids(node_id), - representation_time=parent_result.transform_time, + representation_time=parent_results[0].transform_time, ) task_results[node_id] = task_result task_idx = int(node.parameters.get("_task_idx", 0)) @@ -333,8 +338,8 @@ def submit_node(node_id: str): self.tasks[task_idx], ( self.modalities[0].data - if parent_result is None - else parent_result.data + if parent_results is None + else parent_results[0].data ), gpu_id, ) @@ -342,7 +347,7 @@ def submit_node(node_id: str): future = executor.submit( _execute_node_worker, node, - self.modalities if parent_result is None else [parent_result], + self.modalities if parent_results is None else parent_results, None, None, gpu_id, @@ -393,25 +398,27 @@ def submit_new_ready_nodes(): task_results[node_id].test_score = result["scores"][ 2 ].average_scores - self.checkpoint_manager.increment(node_id) - self.checkpoint_manager.checkpoint_if_due(task_results) - self._checkpoint_memory_usage( - node_id, - peak_bytes, - gpu_peak_bytes, - "task", - memory_usage_data, - None, - ) + if self.enable_checkpointing: + self.checkpoint_manager.increment(node_id) + self.checkpoint_manager.checkpoint_if_due(task_results) + self._checkpoint_memory_usage( + node_id, + peak_bytes, + gpu_peak_bytes, + "task", + memory_usage_data, + None, + ) - parent_node_id = self.scheduler.get_valid_parent(node_id) - if parent_node_id is not None: - self.result_cache.dec_ref(parent_node_id) - if ( - parent_node_id in self.result_cache.ref_count - and self.result_cache.ref_count[parent_node_id] == 0 - ): - self.result_cache.clear(parent_node_id) + parent_node_ids = self.scheduler.get_valid_parents(node_id) + if len(parent_node_ids) > 0: + for parent_node_id in parent_node_ids: + self.result_cache.dec_ref(parent_node_id) + if ( + parent_node_id in self.result_cache.ref_count + and self.result_cache.ref_count[parent_node_id] == 0 + ): + self.result_cache.clear(parent_node_id) self.scheduler.complete_node(node_id) else: @@ -430,14 +437,15 @@ def submit_new_ready_nodes(): self.scheduler.update_node_stats_and_reestimate_descendants( node_id, actual_stats ) - self._checkpoint_memory_usage( - node_id, - peak_bytes, - gpu_peak_bytes, - result["operation_name"], - memory_usage_data, - transformed_modality.data, - ) + if self.enable_checkpointing: + self._checkpoint_memory_usage( + node_id, + peak_bytes, + gpu_peak_bytes, + result["operation_name"], + memory_usage_data, + transformed_modality.data, + ) before_bytes = self.result_cache.get_memory_total_memory_usage() self._manage_result_cache(node_id, transformed_modality) after_bytes = self.result_cache.get_memory_total_memory_usage() @@ -539,6 +547,9 @@ def _print_node_stats(self, node_id: str, result: Any, operation_name: str): assert ( len(result) == node_stats.num_instances ), f"Node {node_id} {operation_name} should have {node_stats.num_instances} instances, actual: {len(result)}" + # assert ( + # shape == node_stats.output_shape + # ), f"Node {node_id} {operation_name} should have shape of {node_stats.output_shape}, actual shape: {shape}" return shape def _infer_actual_output_stats( @@ -575,20 +586,22 @@ def _infer_actual_output_stats( return None def _manage_result_cache(self, node_id: str, result: Any): - parent_node_id = self.scheduler.get_valid_parent(node_id) - if parent_node_id is not None: - self.result_cache.dec_ref(parent_node_id) + parent_node_ids = self.scheduler.get_valid_parents(node_id) + if len(parent_node_ids) > 0: + for parent_node_id in parent_node_ids: + self.result_cache.dec_ref(parent_node_id) if self.scheduler.get_children(node_id): for _ in self.scheduler.get_children(node_id): self.result_cache.inc_ref(node_id) self.result_cache.add_result(node_id, result) - if ( - parent_node_id in self.result_cache.ref_count - and self.result_cache.ref_count[parent_node_id] == 0 - ): - self.result_cache.clear(parent_node_id) + for parent_node_id in parent_node_ids: + if ( + parent_node_id in self.result_cache.ref_count + and self.result_cache.ref_count[parent_node_id] == 0 + ): + self.result_cache.clear(parent_node_id) def _get_nodes_by_ids(self, nodes_ids: List[str]) -> List[RepresentationNode]: return [self.scheduler.mapping[node_id] for node_id in nodes_ids] diff --git a/src/main/python/systemds/scuro/drsearch/node_scheduler.py b/src/main/python/systemds/scuro/drsearch/node_scheduler.py index 4fe3ac1d039..ef3ccc844e5 100644 --- a/src/main/python/systemds/scuro/drsearch/node_scheduler.py +++ b/src/main/python/systemds/scuro/drsearch/node_scheduler.py @@ -166,11 +166,12 @@ def is_finished(self) -> bool: return False - def get_valid_parent(self, node_id: str) -> bool: + def get_valid_parents(self, node_id: str) -> bool: + parents = [] for parent_id in self.parents[node_id]: if parent_id not in self.leaves: - return parent_id - return None + parents.append(parent_id) + return parents def get_children(self, node_id: str) -> List[str]: return list(self.children[node_id]) @@ -207,8 +208,9 @@ def update_node_stats_and_reestimate_descendants( params=self.mapping[desc_id].parameters ) peak_memory = operation.estimate_peak_memory_bytes(input_stats) + input_stats_for_overhead = self._stats_for_overhead(input_stats) peak_memory["cpu_peak_bytes"] += ( - 64 * 1024 + 512 * input_stats.num_instances + 64 * 1024 + 512 * input_stats_for_overhead.num_instances ) output_stats = operation.get_output_stats(input_stats) @@ -395,14 +397,17 @@ def _estimate_node_resources(self): else: parent_ids = list(self.parents.get(node, set())) parent_stats = [node_stats[parent_id] for parent_id in parent_ids] - input_stats = parent_stats[0] if parent_stats else None + input_stats = ( + parent_stats[0] if len(parent_stats) == 1 else parent_stats + ) if node not in self.roots: operation = self.mapping[node].operation( params=self.mapping[node].parameters ) peak_memory = operation.estimate_peak_memory_bytes(input_stats) + input_stats_for_overhead = self._stats_for_overhead(input_stats) peak_memory["cpu_peak_bytes"] += ( - 64 * 1024 + 512 * input_stats.num_instances + 64 * 1024 + 512 * input_stats_for_overhead.num_instances ) # Placeholder for transformed modality creation overhead peak_memory["cpu_peak_bytes"] *= 1 output_stats = operation.get_output_stats(input_stats) @@ -429,6 +434,12 @@ def _estimate_node_resources(self): self.node_stats = node_stats return node_resources + @staticmethod + def _stats_for_overhead(input_stats: Any) -> Any: + if isinstance(input_stats, list) and len(input_stats) > 0: + return input_stats[0] + return input_stats + @staticmethod def _stats_to_bytes(stats: Optional[Any], dtype_size: int = 4) -> int: if stats is None: diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py b/src/main/python/systemds/scuro/drsearch/representation_dag.py index b2911dbe7a8..099732e46df 100644 --- a/src/main/python/systemds/scuro/drsearch/representation_dag.py +++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py @@ -74,7 +74,8 @@ class RepresentationNode: @dataclass class RepresentationDag: - def __init__(self, nodes: List[Any], root_node_id): + def __init__(self, nodes: List[Any], root_node_id, dag_id: int = None): + self.dag_id = dag_id self.root_node_id = root_node_id self.nodes = self.filter_connected_nodes(nodes) @@ -399,6 +400,7 @@ class RepresentationDAGBuilder: def __init__(self): self.nodes = [] self.node_counter = 0 + self.dag_counter = 0 def create_leaf_node( self, modality_id: str, representation_index: int = -1, operation=None @@ -431,10 +433,13 @@ def create_operation_node( self.nodes.append(node) return node_id - def build(self, root_node_id: str) -> RepresentationDag: + def build(self, root_node_id: str, dag_id: int = None) -> RepresentationDag: dag = RepresentationDag( - nodes=copy.deepcopy(self.nodes), root_node_id=root_node_id + nodes=copy.deepcopy(self.nodes), + root_node_id=root_node_id, + dag_id=dag_id if dag_id is not None else self.dag_counter + 1, ) + self.dag_counter += 1 if not dag.validate(): raise ValueError("Invalid DAG construction") return dag @@ -524,6 +529,7 @@ def __init__(self): self.signature_to_node: Dict[Hashable, str] = {} self.node_to_signature: Dict[str, Hashable] = {} self.node_counter = 0 + self.dag_counter = 0 def _compute_node_signature( self, operation: Any, inputs: List[str], parameters: Dict[str, Any] = None @@ -602,8 +608,13 @@ def create_operation_node( operation=operation, inputs=inputs, parameters=parameters, is_leaf=False ) - def build(self, root_node_id: str) -> RepresentationDag: - dag = RepresentationDag(nodes=self.global_nodes, root_node_id=root_node_id) + def build(self, root_node_id: str, dag_id: int = None) -> RepresentationDag: + dag = RepresentationDag( + nodes=self.global_nodes, + root_node_id=root_node_id, + dag_id=dag_id if dag_id is not None else self.dag_counter + 1, + ) + self.dag_counter += 1 if not dag.validate(): raise ValueError("Invalid DAG construction") return dag diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index f733dc61aef..215f27929f3 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -62,7 +62,9 @@ def __init__( checkpoint_every: Optional[int] = 1, resume: bool = False, max_num_workers: int = -1, + enable_checkpointing: bool = True, ): + self.enable_checkpointing = enable_checkpointing self.modalities = modalities self.tasks = tasks self.modality_ids = [modality.modality_id for modality in modalities] @@ -295,7 +297,9 @@ def _expand_dags_with_task_roots( ) task_root_dag = RepresentationDag( - nodes=[*dag.nodes, task_node], root_node_id=task_node_id + nodes=[*dag.nodes, task_node], + root_node_id=task_node_id, + dag_id=dag.dag_id, ) expanded_dags.append(task_root_dag) return expanded_dags @@ -329,25 +333,28 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None): ) dags = self.add_aggregation_operator(self.builders[modality.modality_id], dags) - dags = pushdown_aggregation(dags) + dags_with_pushdown = pushdown_aggregation(dags) if skip_remaining > 0: dags = dags[skip_remaining:] - expanded_dags = self._expand_dags_with_task_roots(dags) + expanded_dags_with_task_roots = self._expand_dags_with_task_roots( + dags_with_pushdown + ) node_executor = NodeExecutor( - expanded_dags, + expanded_dags_with_task_roots, [modality], self.tasks, self._checkpoint_manager, self.max_num_workers, self.result_path, + enable_checkpointing=self.enable_checkpointing, ) task_results = node_executor.run() for task_result in task_results: - local_results.add_task_result(task_result) + local_results.add_task_result(task_result, dags) if self.save_all_results: timestr = time.strftime("%Y%m%d-%H%M%S") @@ -515,7 +522,7 @@ def add_aggregation_operator(self, builder, dags): [dag.root_node_id], agg_op.get_current_parameters(), ) - aggregated_dags.append(builder.build(agg_node_id)) + aggregated_dags.append(builder.build(agg_node_id, dag.dag_id)) else: aggregated_dags.append(dag) new_dags = aggregated_dags @@ -596,10 +603,12 @@ def __init__( self.results[modality] = {task_name: [] for task_name in self.task_names} self.cache[modality] = {task_name: [] for task_name in self.task_names} - def add_task_result(self, task_result: ResultEntry): + def add_task_result(self, task_result: ResultEntry, dags: List[RepresentationDag]): + dag_id = task_result.dag.dag_id task_name = self.task_names[ task_result.dag.nodes[-1].parameters.get("_task_idx", 0) ] + task_result.dag = get_dag_by_id(dags, dag_id) self.results[task_result.dag.nodes[0].modality_id][task_name].append( task_result ) @@ -716,3 +725,10 @@ def get_k_best_results( ] = cache return results, cache + + +def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) -> RepresentationDag: + for dag in dags: + if dag.dag_id == dag_id: + return dag + return None diff --git a/src/main/python/systemds/scuro/modality/transformed.py b/src/main/python/systemds/scuro/modality/transformed.py index e185deb7c97..edb99a3061b 100644 --- a/src/main/python/systemds/scuro/modality/transformed.py +++ b/src/main/python/systemds/scuro/modality/transformed.py @@ -19,6 +19,7 @@ # # ------------------------------------------------------------- from typing import Union, List +import inspect import numpy as np from systemds.scuro.modality.type import ModalityType from systemds.scuro.modality.joined import JoinedModality @@ -166,7 +167,11 @@ def dimensionality_reduction(self, dimensionality_reduction_operator): def apply_representation(self, representation, aggregation=None): start = time.time() - new_modality = representation.transform(self, aggregation=aggregation) + transform_sig = inspect.signature(representation.transform) + if "aggregation" in transform_sig.parameters: + new_modality = representation.transform(self, aggregation=aggregation) + else: + new_modality = representation.transform(self) new_modality.update_metadata() new_modality.transform_time += time.time() - start new_modality.self_contained = representation.self_contained diff --git a/src/main/python/systemds/scuro/representations/aggregated_representation.py b/src/main/python/systemds/scuro/representations/aggregated_representation.py index 4caa22762b0..85744f9209a 100644 --- a/src/main/python/systemds/scuro/representations/aggregated_representation.py +++ b/src/main/python/systemds/scuro/representations/aggregated_representation.py @@ -52,9 +52,9 @@ def __init__(self, aggregation="mean", target_dimensions=None, params=None): def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationStats: input_shape = list(copy.deepcopy(input_stats.output_shape)) - input_aggregate_dim = copy.deepcopy(input_stats.aggregate_dim) - for input_aggregate_dim in reversed(input_aggregate_dim): - input_shape.pop(input_aggregate_dim) + if self.target_dimensions is not None: + while len(input_shape) > self.target_dimensions: + input_shape.pop() out_shape = tuple(input_shape) self.stats = RepresentationStats( input_stats.num_instances, @@ -100,11 +100,11 @@ def transform(self, modality): if len(input_dimensions) == self.target_dimensions: return modality else: - - i = 1 - while len(input_dimensions) - 1 > self.target_dimensions: + i = len(input_dimensions) - 1 + aggregate_dim = () + while len(input_dimensions) > self.target_dimensions: aggregate_dim = aggregate_dim + (i,) - i += 1 + i -= 1 input_dimensions = input_dimensions[:-1] aggregated_data = self.aggregation.execute(modality, aggregate_dim) diff --git a/src/main/python/systemds/scuro/representations/bert.py b/src/main/python/systemds/scuro/representations/bert.py index ab23f463457..fcaed8d4935 100644 --- a/src/main/python/systemds/scuro/representations/bert.py +++ b/src/main/python/systemds/scuro/representations/bert.py @@ -329,7 +329,7 @@ def __init__( self.C = 0.3 -# @register_representation(ModalityType.TEXT) +@register_representation(ModalityType.TEXT) class RoBERTa(BertFamily): def __init__( self, diff --git a/src/main/python/systemds/scuro/representations/bow.py b/src/main/python/systemds/scuro/representations/bow.py index 65e490972a3..9e55add5de0 100644 --- a/src/main/python/systemds/scuro/representations/bow.py +++ b/src/main/python/systemds/scuro/representations/bow.py @@ -43,7 +43,7 @@ def __init__(self, ngram_range=2, min_df=2, output_file=None, params=None): def get_output_stats(self, input_stats: TextStats) -> RepresentationStats: vocab_estimate = min( - 100_000, + 100000, max( 1000, input_stats.num_instances * input_stats.max_length * self.ngram_range, @@ -76,7 +76,11 @@ def transform(self, modality, aggregation=None): ngram_range=(1, self.ngram_range), min_df=self.min_df ) - X = vectorizer.fit_transform(modality.data).toarray() + X = ( + vectorizer.fit_transform(modality.data) + .toarray() + .astype(np.float32, copy=False) + ) if self.output_file is not None: save_embeddings(X, self.output_file) diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py b/src/main/python/systemds/scuro/representations/mlp_averaging.py index bedfaf415b4..8c8d67a06ec 100644 --- a/src/main/python/systemds/scuro/representations/mlp_averaging.py +++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py @@ -118,19 +118,18 @@ def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict: batch_input_bytes = batch * input_dim * elem_size batch_output_bytes = batch * out_dim * elem_size - - num_batches = (n + batch - 1) // batch - python_overhead = num_batches * 1024 - + input_torch_copy_bytes = input_bytes + output_accum_transient_bytes = output_bytes cpu_working = ( input_bytes - + 2 * output_bytes + + input_torch_copy_bytes + + output_bytes + + output_accum_transient_bytes + weight_bytes + batch_input_bytes + batch_output_bytes - + python_overhead ) - cpu_peak = int(cpu_working * 1.20 + 64 * 1024**2) + cpu_peak = int(cpu_working * 1.15 + 64 * 1024**2) gpu_working = weight_bytes + batch_input_bytes + batch_output_bytes gpu_peak = int(gpu_working * 1.35 + 560 * 1024**2) diff --git a/src/main/python/systemds/scuro/representations/tfidf.py b/src/main/python/systemds/scuro/representations/tfidf.py index 3fd61099f1b..0b603f247e3 100644 --- a/src/main/python/systemds/scuro/representations/tfidf.py +++ b/src/main/python/systemds/scuro/representations/tfidf.py @@ -56,19 +56,16 @@ def estimate_output_memory_bytes(self, input_stats: TextStats) -> int: ) def estimate_peak_memory_bytes(self, input_stats: TextStats) -> dict: - output_bytes = self.estimate_output_memory_bytes(input_stats) - vectorizer_overhead = 640 * 1024 - return { - "cpu_peak_bytes": output_bytes + vectorizer_overhead, - "gpu_peak_bytes": 0, - } + dense_bytes = self.estimate_output_memory_bytes(input_stats) + cpu_peak = int(dense_bytes * 2.2 + 32 * 1024 * 1024) + return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0} def transform(self, modality, aggregation=None): transformed_modality = TransformedModality(modality, self) vectorizer = TfidfVectorizer(min_df=self.min_df) - X = vectorizer.fit_transform(modality.data) + X = vectorizer.fit_transform(modality.data).astype(np.float32, copy=False) if self.output_file is not None: save_embeddings(X, self.output_file) diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index c9622f1a2ad..1d05a8fda48 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -167,11 +167,18 @@ def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict: in_numel = effective_seq_len * self._rest_numel(in_shape) output_bytes = self.estimate_output_memory_bytes(input_stats) one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize - cpu_peak = ( - output_bytes * 2 - + one_instance_bytes * input_stats.num_instances - + one_instance_bytes * self.window_size - + 8 * 1024 * 1024 + input_bytes = one_instance_bytes * input_stats.num_instances + + output_transient = output_bytes + + pad_overhead = 0 + if getattr(self, "pad", False): + out_seq_len = math.ceil(in_shape[0] / self.window_size) + pad_overhead = int(input_stats.num_instances * out_seq_len * 8) + + cpu_peak = int( + (input_bytes + output_bytes + output_transient + pad_overhead) * 1.15 + + 16 * 1024 * 1024 ) return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0} @@ -298,12 +305,12 @@ def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict: return {"cpu_peak_bytes": 0, "gpu_peak_bytes": 0} effective_seq_len = in_shape[0] in_numel = effective_seq_len * self._rest_numel(in_shape) - output_bytes = self.estimate_output_memory_bytes(input_stats) one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize - cpu_peak = ( - output_bytes * 2 - + one_instance_bytes * input_stats.num_instances - + 8 * 1024 * 1024 + input_bytes = one_instance_bytes * input_stats.num_instances + output_bytes = self.estimate_output_memory_bytes(input_stats) + output_transient = output_bytes + cpu_peak = int( + (input_bytes + output_bytes + output_transient) * 1.12 + 12 * 1024 * 1024 ) return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0} diff --git a/src/main/python/tests/scuro/test_hp_tuner.py b/src/main/python/tests/scuro/test_hp_tuner.py index 72591d74cc0..c418cefcae8 100644 --- a/src/main/python/tests/scuro/test_hp_tuner.py +++ b/src/main/python/tests/scuro/test_hp_tuner.py @@ -20,6 +20,7 @@ # ------------------------------------------------------------- import unittest +from types import SimpleNamespace import numpy as np @@ -54,163 +55,196 @@ class TestHPTuner(unittest.TestCase): - # Note: HPTuner is being refactored and not yet ready for testing - pass - - -# data_generator = None -# num_instances = 0 - -# @classmethod -# def setUpClass(cls): -# cls.num_instances = 10 -# cls.mods = [ -# ModalityType.VIDEO, -# ModalityType.AUDIO, -# ModalityType.TEXT, -# ModalityType.IMAGE, -# ] -# cls.indices = np.array(range(cls.num_instances)) -# cls.tasks = [ -# TestTask("UnimodalRepresentationTask1", "TestSVM1", cls.num_instances), -# TestTask("UnimodalRepresentationTask2", "TestSVM2", cls.num_instances), -# ] - -# def test_hp_tuner_for_audio_modality(self): -# audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( -# self.num_instances, 3000 -# ) -# audio = UnimodalModality( -# TestDataLoader( -# self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md -# ) -# ) - -# self.run_hp_for_modality([audio]) - -# def test_multimodal_hp_tuning(self): -# audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( -# self.num_instances, 3000 -# ) -# audio = UnimodalModality( -# TestDataLoader( -# self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md -# ) -# ) - -# text_data, text_md = ModalityRandomDataGenerator().create_text_data( -# self.num_instances -# ) -# text = UnimodalModality( -# TestDataLoader( -# self.indices, None, ModalityType.TEXT, text_data, str, text_md -# ) -# ) - -# # self.run_hp_for_modality( -# # [audio, text], multimodal=True, tune_unimodal_representations=True -# # ) -# self.run_hp_for_modality( -# [audio, text], multimodal=True, tune_unimodal_representations=False -# ) - -# def test_hp_tuner_for_text_modality(self): -# text_data, text_md = ModalityRandomDataGenerator().create_text_data( -# self.num_instances -# ) -# text = UnimodalModality( -# TestDataLoader( -# self.indices, None, ModalityType.TEXT, text_data, str, text_md -# ) -# ) -# self.run_hp_for_modality([text]) - -# def test_hp_tuner_for_image_modality(self): -# image_data, image_md = ModalityRandomDataGenerator().create_visual_modality( -# self.num_instances, 1 -# ) -# image = UnimodalModality( -# TestDataLoader( -# self.indices, None, ModalityType.IMAGE, image_data, np.float32, image_md -# ) -# ) -# self.run_hp_for_modality([image]) - -# def run_hp_for_modality( -# self, modalities, multimodal=False, tune_unimodal_representations=False -# ): -# with patch.object( -# Registry, -# "_representations", -# { -# ModalityType.TEXT: [W2V, BoW], -# ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, Pitch], -# ModalityType.TIMESERIES: [ResNet], -# ModalityType.VIDEO: [ResNet], -# ModalityType.IMAGE: [ResNet, ColorHistogram], -# ModalityType.EMBEDDING: [], -# }, -# ): -# registry = Registry() -# registry._fusion_operators = [LSTM] -# unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, False) -# unimodal_optimizer.optimize() - -# hp = HyperparameterTuner( -# modalities, -# self.tasks, -# unimodal_optimizer.operator_performance, -# n_jobs=1, -# ) - -# if multimodal: -# m_o = MultimodalOptimizer( -# modalities, -# unimodal_optimizer.operator_performance, -# self.tasks, -# debug=False, -# min_modalities=2, -# max_modalities=3, -# ) -# fusion_results = m_o.optimize(20) - -# hp.tune_multimodal_representations( -# fusion_results, -# k=1, -# optimize_unimodal=tune_unimodal_representations, -# max_eval_per_rep=10, -# ) - -# else: -# hp.tune_unimodal_representations(max_eval_per_rep=10) - -# assert len(hp.optimization_results.results) == len(self.tasks) -# if multimodal: -# if tune_unimodal_representations: -# assert ( -# len( -# hp.optimization_results.results[self.tasks[0].model.name][0] -# ) -# == 1 -# ) -# else: -# assert ( -# len( -# hp.optimization_results.results[self.tasks[0].model.name][ -# "mm_results" -# ] -# ) -# == 1 -# ) -# else: -# assert ( -# len(hp.optimization_results.results[self.tasks[0].model.name]) == 1 -# ) -# assert ( -# len(hp.optimization_results.results[self.tasks[0].model.name][0]) -# == 2 -# ) - - -# if __name__ == "__main__": -# unittest.main() + data_generator = None + num_instances = 0 + + @classmethod + def setUpClass(cls): + cls.num_instances = 10 + cls.mods = [ + ModalityType.VIDEO, + ModalityType.AUDIO, + ModalityType.TEXT, + ModalityType.IMAGE, + ] + cls.indices = np.array(range(cls.num_instances)) + cls.tasks = [ + TestTask("UnimodalRepresentationTask1", "TestSVM1", cls.num_instances), + TestTask("UnimodalRepresentationTask2", "TestSVM2", cls.num_instances), + ] + + def test_hp_tuner_for_text_modality(self): + text_data, text_md = ModalityRandomDataGenerator().create_text_data( + self.num_instances + ) + text = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.TEXT, text_data, str, text_md + ) + ) + self.run_hp_for_modality([text]) + + # TODO: Add once the final multimodal optimizer is implemented + # def test_multimodal_hp_tuning(self): + # audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( + # self.num_instances, 3000 + # ) + # audio = UnimodalModality( + # TestDataLoader( + # self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md + # ) + # ) + + # text_data, text_md = ModalityRandomDataGenerator().create_text_data( + # self.num_instances + # ) + # text = UnimodalModality( + # TestDataLoader( + # self.indices, None, ModalityType.TEXT, text_data, str, text_md + # ) + # ) + + # self.run_hp_for_modality( + # [audio, text], multimodal=True, tune_unimodal_representations=False + # ) + + def test_hp_tuner_for_image_modality(self): + image_data, image_md = ModalityRandomDataGenerator().create_visual_modality( + self.num_instances, 1 + ) + image = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.IMAGE, image_data, np.float32, image_md + ) + ) + self.run_hp_for_modality([image]) + + def run_hp_for_modality( + self, modalities, multimodal=False, tune_unimodal_representations=False + ): + with patch.object( + Registry, + "_representations", + { + ModalityType.TEXT: [BoW, W2V], + ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, Pitch], + ModalityType.TIMESERIES: [ResNet], + ModalityType.VIDEO: [ResNet], + ModalityType.IMAGE: [ResNet, ColorHistogram], + ModalityType.EMBEDDING: [], + }, + ): + registry = Registry() + registry._fusion_operators = [LSTM] + unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, False) + unimodal_optimizer.optimize() + + hp = HyperparameterTuner( + modalities, + self.tasks, + unimodal_optimizer.operator_performance, + n_jobs=1, + ) + + if multimodal: + m_o = MultimodalOptimizer( + modalities, + unimodal_optimizer.operator_performance, + self.tasks, + debug=False, + min_modalities=2, + max_modalities=3, + ) + fusion_results = m_o.optimize(20) + + hp.tune_multimodal_representations( + fusion_results, + k=1, + optimize_unimodal=tune_unimodal_representations, + max_eval_per_rep=10, + ) + + else: + hp.tune_unimodal_representations(max_eval_per_rep=10) + + assert len(hp.optimization_results.results) == len(self.tasks) + if multimodal: + if tune_unimodal_representations: + assert ( + len( + hp.optimization_results.results[self.tasks[0].model.name][0] + ) + == 1 + ) + else: + assert ( + len( + hp.optimization_results.results[self.tasks[0].model.name][ + "mm_results" + ] + ) + == 1 + ) + else: + assert ( + len(hp.optimization_results.results[self.tasks[0].model.name]) == 1 + ) + modality_id = modalities[0].modality_id + assert ( + len( + hp.optimization_results.results[self.tasks[0].model.name][ + modality_id + ] + ) + == 2 + ) + + def test_evaluate_configs_deduplicates_candidates(self): + class DummyOptimizationResults: + def get_k_best_results(self, modality, task, performance_metric_name): + return [], [] + + task = SimpleNamespace(model=SimpleNamespace(name="dummy_task")) + hp = HyperparameterTuner( + modalities=[], + tasks=[task], + optimization_results=DummyOptimizationResults(), + n_jobs=1, + ) + + eval_calls = {"count": 0} + + def fake_evaluate( + dag, params, node_order, modality_ids, task, modalities_override=None + ): + eval_calls["count"] += 1 + return params, float(params["x"]) + + hp.evaluate_dag_config = fake_evaluate + + candidate_configs = [ + {"x": 1}, + {"x": 1}, + {"x": 2}, + {"x": 2}, + {"x": 1}, + ] + seen_configs = {} + + results = hp._evaluate_configs( + dag=None, + task=None, + node_order=[], + modality_ids=[], + modalities_override=None, + candidate_configs=candidate_configs, + seen_configs=seen_configs, + ) + + self.assertEqual(eval_calls["count"], 2) + self.assertEqual(len(seen_configs), 2) + self.assertEqual([r[0]["x"] for r in results], [1, 2]) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py b/src/main/python/tests/scuro/test_unimodal_optimizer.py index 4fe98800e6a..3c5ce2a67f7 100644 --- a/src/main/python/tests/scuro/test_unimodal_optimizer.py +++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py @@ -24,23 +24,10 @@ import numpy as np from systemds.scuro.representations.clip import CLIPText, CLIPVisual -from systemds.scuro.representations.bert import ALBERT, ELECTRA, RoBERTa, DistillBERT from systemds.scuro.representations.color_histogram import ColorHistogram -from systemds.scuro.representations.glove import GloVe -from systemds.scuro.representations.timeseries_representations import ( - Mean, - ACF, -) from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer -from systemds.scuro.drsearch.representation_dag import RepresentationNode -from systemds.scuro.representations.representation import RepresentationStats -from systemds.scuro.representations.spectrogram import Spectrogram -from systemds.scuro.representations.covarep_audio_features import ( - ZeroCrossing, -) -from systemds.scuro.representations.vgg import VGG19 from systemds.scuro.representations.word2vec import W2V from systemds.scuro.representations.bow import BoW from systemds.scuro.representations.bert import Bert @@ -51,7 +38,17 @@ TestDataLoader, TestTask, ) +import copy +from systemds.scuro.drsearch.representation_dag import ( + CSEAwareDAGBuilder, + RepresentationDag, + pushdown_aggregation, +) +from systemds.scuro.representations.aggregated_representation import ( + AggregatedRepresentation, +) +from systemds.scuro.representations.bert import Bert from systemds.scuro.modality.type import ModalityType from unittest.mock import patch @@ -73,19 +70,6 @@ def setUpClass(cls): TestTask("UnimodalRepresentationTask2", "Test2", cls.num_instances), ] - # Note: Audio optimizer still needs some work - # def test_unimodal_optimizer_for_audio_modality(self): - # audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( - # self.num_instances, 3000 - # ) - # audio = UnimodalModality( - # TestDataLoader( - # self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md - # ) - # ) - - # self.optimize_unimodal_representation_for_modality([audio]) - def test_unimodal_optimizer_for_text_modality(self): text_data, text_md = ModalityRandomDataGenerator().create_text_data( self.num_instances, 10 @@ -127,18 +111,6 @@ def test_unimodal_optimizer_for_multiple_modalities(self): ) self.optimize_unimodal_representation_for_modality([text, image]) - # Note: Timeseries optimizer still needs some work - # def test_unimodal_optimizer_for_ts_modality(self): - # ts_data, ts_md = ModalityRandomDataGenerator().create_timeseries_data( - # self.num_instances, 1000 - # ) - # ts = UnimodalModality( - # TestDataLoader( - # self.indices, None, ModalityType.TIMESERIES, ts_data, np.float32, ts_md - # ) - # ) - # self.optimize_unimodal_representation_for_modality([ts]) - def test_unimodal_optimizer_for_video_modality(self): video_data, video_md = ModalityRandomDataGenerator().create_visual_modality( self.num_instances, 10, 10 @@ -150,6 +122,67 @@ def test_unimodal_optimizer_for_video_modality(self): ) self.optimize_unimodal_representation_for_modality([video]) + def test_aggregation_pushdown_preserves_dag_id_and_bert_node_parameters(self): + builder = CSEAwareDAGBuilder() + modality_id = "test_modality_agg_pushdown" + leaf_id = builder.create_leaf_node(modality_id) + + bert = Bert() + bert_id = builder.create_operation_node( + Bert, [leaf_id], bert.get_current_parameters() + ) + + agg = AggregatedRepresentation(target_dimensions=1) + agg_id = builder.create_operation_node( + AggregatedRepresentation, + [bert_id], + agg.get_current_parameters(), + ) + + expected_dag_id = 1001 + dag = RepresentationDag( + nodes=copy.deepcopy(builder.global_nodes), + root_node_id=agg_id, + dag_id=expected_dag_id, + ) + + by_id = {n.node_id: n for n in dag.nodes} + self.assertEqual(len(dag.nodes), 3) + self.assertEqual(dag.dag_id, expected_dag_id) + self.assertEqual(dag.root_node_id, agg_id) + + self.assertEqual(by_id[leaf_id].inputs, []) + self.assertEqual(by_id[bert_id].inputs, [leaf_id]) + self.assertEqual(by_id[agg_id].inputs, [bert_id]) + self.assertIs(by_id[bert_id].operation, Bert) + self.assertIs(by_id[agg_id].operation, AggregatedRepresentation) + + bert_params_before = copy.deepcopy(by_id[bert_id].parameters) + agg_params_snapshot = copy.deepcopy(by_id[agg_id].parameters) + self.assertNotIn("_pushdown_aggregation", bert_params_before) + + pushdown_aggregation([dag]) + + self.assertEqual(dag.dag_id, expected_dag_id) + self.assertEqual(dag.root_node_id, bert_id) + self.assertEqual(len(dag.nodes), 2) + self.assertIsNone(dag.get_node_by_id(agg_id)) + + bert_after = dag.get_node_by_id(bert_id) + self.assertIsNotNone(bert_after) + self.assertEqual(bert_after.inputs, [leaf_id]) + self.assertIn("_pushdown_aggregation", bert_after.parameters) + self.assertEqual( + bert_after.parameters["_pushdown_aggregation"], + agg_params_snapshot, + ) + remaining = { + k: v + for k, v in bert_after.parameters.items() + if k != "_pushdown_aggregation" + } + self.assertEqual(remaining, bert_params_before) + def optimize_unimodal_representation_for_modality(self, modalities): with patch.object( Registry, @@ -161,8 +194,6 @@ def optimize_unimodal_representation_for_modality(self, modalities): Bert, CLIPText, ], - ModalityType.AUDIO: [Spectrogram, ZeroCrossing], - ModalityType.TIMESERIES: [Mean, ACF], ModalityType.VIDEO: [ResNet], ModalityType.IMAGE: [ColorHistogram, CLIPVisual], ModalityType.EMBEDDING: [], @@ -171,7 +202,12 @@ def optimize_unimodal_representation_for_modality(self, modalities): registry = Registry() unimodal_optimizer = UnimodalOptimizer( - modalities, self.tasks, False, k=1, max_num_workers=1 + modalities, + self.tasks, + False, + k=1, + max_num_workers=1, + enable_checkpointing=False, ) unimodal_optimizer.optimize() for modality in modalities: @@ -185,4 +221,3 @@ def optimize_unimodal_representation_for_modality(self, modalities): modalities[0], self.tasks[0], "accuracy" ) assert len(result) == 1 - assert len(cached) == 1