Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
469 changes: 382 additions & 87 deletions src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py

Large diffs are not rendered by default.

97 changes: 55 additions & 42 deletions src/main/python/systemds/scuro/drsearch/node_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ def __init__(
checkpoint_manager: Optional[CheckpointManager] = None,
max_num_workers: int = -1,
result_path: Optional[str] = None,
enable_checkpointing: bool = True,
):
self.enable_checkpointing = enable_checkpointing
available_total_cpu = (
float(psutil.virtual_memory().available)
- float(psutil.virtual_memory().available) * 0.30
Expand Down Expand Up @@ -316,14 +318,17 @@ def run(self) -> None:
def submit_node(node_id: str):
node = self.scheduler.mapping[node_id]
gpu_id = node.gpu_id
parent_result = None
parent_node_id = self.scheduler.get_valid_parent(node_id)
if parent_node_id is not None:
parent_result = self.result_cache.get(parent_node_id)
parent_node_ids = self.scheduler.get_valid_parents(node_id)
parent_results = None
if parent_node_ids:
parent_results = [
self.result_cache.get(parent_node_id)
for parent_node_id in parent_node_ids
]
if self._is_task_node(node):
task_result = ResultEntry(
dag=self._get_dag_from_node_ids(node_id),
representation_time=parent_result.transform_time,
representation_time=parent_results[0].transform_time,
)
task_results[node_id] = task_result
task_idx = int(node.parameters.get("_task_idx", 0))
Expand All @@ -333,16 +338,16 @@ def submit_node(node_id: str):
self.tasks[task_idx],
(
self.modalities[0].data
if parent_result is None
else parent_result.data
if parent_results is None
else parent_results[0].data
),
gpu_id,
)
else:
future = executor.submit(
_execute_node_worker,
node,
self.modalities if parent_result is None else [parent_result],
self.modalities if parent_results is None else parent_results,
None,
None,
gpu_id,
Expand Down Expand Up @@ -393,25 +398,27 @@ def submit_new_ready_nodes():
task_results[node_id].test_score = result["scores"][
2
].average_scores
self.checkpoint_manager.increment(node_id)
self.checkpoint_manager.checkpoint_if_due(task_results)
self._checkpoint_memory_usage(
node_id,
peak_bytes,
gpu_peak_bytes,
"task",
memory_usage_data,
None,
)
if self.enable_checkpointing:
self.checkpoint_manager.increment(node_id)
self.checkpoint_manager.checkpoint_if_due(task_results)
self._checkpoint_memory_usage(
node_id,
peak_bytes,
gpu_peak_bytes,
"task",
memory_usage_data,
None,
)

parent_node_id = self.scheduler.get_valid_parent(node_id)
if parent_node_id is not None:
self.result_cache.dec_ref(parent_node_id)
if (
parent_node_id in self.result_cache.ref_count
and self.result_cache.ref_count[parent_node_id] == 0
):
self.result_cache.clear(parent_node_id)
parent_node_ids = self.scheduler.get_valid_parents(node_id)
if len(parent_node_ids) > 0:
for parent_node_id in parent_node_ids:
self.result_cache.dec_ref(parent_node_id)
if (
parent_node_id in self.result_cache.ref_count
and self.result_cache.ref_count[parent_node_id] == 0
):
self.result_cache.clear(parent_node_id)
self.scheduler.complete_node(node_id)

else:
Expand All @@ -430,14 +437,15 @@ def submit_new_ready_nodes():
self.scheduler.update_node_stats_and_reestimate_descendants(
node_id, actual_stats
)
self._checkpoint_memory_usage(
node_id,
peak_bytes,
gpu_peak_bytes,
result["operation_name"],
memory_usage_data,
transformed_modality.data,
)
if self.enable_checkpointing:
self._checkpoint_memory_usage(
node_id,
peak_bytes,
gpu_peak_bytes,
result["operation_name"],
memory_usage_data,
transformed_modality.data,
)
before_bytes = self.result_cache.get_memory_total_memory_usage()
self._manage_result_cache(node_id, transformed_modality)
after_bytes = self.result_cache.get_memory_total_memory_usage()
Expand Down Expand Up @@ -539,6 +547,9 @@ def _print_node_stats(self, node_id: str, result: Any, operation_name: str):
assert (
len(result) == node_stats.num_instances
), f"Node {node_id} {operation_name} should have {node_stats.num_instances} instances, actual: {len(result)}"
# assert (
# shape == node_stats.output_shape
# ), f"Node {node_id} {operation_name} should have shape of {node_stats.output_shape}, actual shape: {shape}"
return shape

def _infer_actual_output_stats(
Expand Down Expand Up @@ -575,20 +586,22 @@ def _infer_actual_output_stats(
return None

def _manage_result_cache(self, node_id: str, result: Any):
parent_node_id = self.scheduler.get_valid_parent(node_id)
if parent_node_id is not None:
self.result_cache.dec_ref(parent_node_id)
parent_node_ids = self.scheduler.get_valid_parents(node_id)
if len(parent_node_ids) > 0:
for parent_node_id in parent_node_ids:
self.result_cache.dec_ref(parent_node_id)

if self.scheduler.get_children(node_id):
for _ in self.scheduler.get_children(node_id):
self.result_cache.inc_ref(node_id)
self.result_cache.add_result(node_id, result)

if (
parent_node_id in self.result_cache.ref_count
and self.result_cache.ref_count[parent_node_id] == 0
):
self.result_cache.clear(parent_node_id)
for parent_node_id in parent_node_ids:
if (
parent_node_id in self.result_cache.ref_count
and self.result_cache.ref_count[parent_node_id] == 0
):
self.result_cache.clear(parent_node_id)

def _get_nodes_by_ids(self, nodes_ids: List[str]) -> List[RepresentationNode]:
return [self.scheduler.mapping[node_id] for node_id in nodes_ids]
Expand Down
23 changes: 17 additions & 6 deletions src/main/python/systemds/scuro/drsearch/node_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,12 @@ def is_finished(self) -> bool:

return False

def get_valid_parent(self, node_id: str) -> bool:
def get_valid_parents(self, node_id: str) -> bool:
parents = []
for parent_id in self.parents[node_id]:
if parent_id not in self.leaves:
return parent_id
return None
parents.append(parent_id)
return parents

def get_children(self, node_id: str) -> List[str]:
return list(self.children[node_id])
Expand Down Expand Up @@ -207,8 +208,9 @@ def update_node_stats_and_reestimate_descendants(
params=self.mapping[desc_id].parameters
)
peak_memory = operation.estimate_peak_memory_bytes(input_stats)
input_stats_for_overhead = self._stats_for_overhead(input_stats)
peak_memory["cpu_peak_bytes"] += (
64 * 1024 + 512 * input_stats.num_instances
64 * 1024 + 512 * input_stats_for_overhead.num_instances
)
output_stats = operation.get_output_stats(input_stats)

Expand Down Expand Up @@ -395,14 +397,17 @@ def _estimate_node_resources(self):
else:
parent_ids = list(self.parents.get(node, set()))
parent_stats = [node_stats[parent_id] for parent_id in parent_ids]
input_stats = parent_stats[0] if parent_stats else None
input_stats = (
parent_stats[0] if len(parent_stats) == 1 else parent_stats
)
if node not in self.roots:
operation = self.mapping[node].operation(
params=self.mapping[node].parameters
)
peak_memory = operation.estimate_peak_memory_bytes(input_stats)
input_stats_for_overhead = self._stats_for_overhead(input_stats)
peak_memory["cpu_peak_bytes"] += (
64 * 1024 + 512 * input_stats.num_instances
64 * 1024 + 512 * input_stats_for_overhead.num_instances
) # Placeholder for transformed modality creation overhead
peak_memory["cpu_peak_bytes"] *= 1
output_stats = operation.get_output_stats(input_stats)
Expand All @@ -429,6 +434,12 @@ def _estimate_node_resources(self):
self.node_stats = node_stats
return node_resources

@staticmethod
def _stats_for_overhead(input_stats: Any) -> Any:
if isinstance(input_stats, list) and len(input_stats) > 0:
return input_stats[0]
return input_stats

@staticmethod
def _stats_to_bytes(stats: Optional[Any], dtype_size: int = 4) -> int:
if stats is None:
Expand Down
21 changes: 16 additions & 5 deletions src/main/python/systemds/scuro/drsearch/representation_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class RepresentationNode:
@dataclass
class RepresentationDag:

def __init__(self, nodes: List[Any], root_node_id):
def __init__(self, nodes: List[Any], root_node_id, dag_id: int = None):
self.dag_id = dag_id
self.root_node_id = root_node_id
self.nodes = self.filter_connected_nodes(nodes)

Expand Down Expand Up @@ -399,6 +400,7 @@ class RepresentationDAGBuilder:
def __init__(self):
self.nodes = []
self.node_counter = 0
self.dag_counter = 0

def create_leaf_node(
self, modality_id: str, representation_index: int = -1, operation=None
Expand Down Expand Up @@ -431,10 +433,13 @@ def create_operation_node(
self.nodes.append(node)
return node_id

def build(self, root_node_id: str) -> RepresentationDag:
def build(self, root_node_id: str, dag_id: int = None) -> RepresentationDag:
dag = RepresentationDag(
nodes=copy.deepcopy(self.nodes), root_node_id=root_node_id
nodes=copy.deepcopy(self.nodes),
root_node_id=root_node_id,
dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
)
self.dag_counter += 1
if not dag.validate():
raise ValueError("Invalid DAG construction")
return dag
Expand Down Expand Up @@ -524,6 +529,7 @@ def __init__(self):
self.signature_to_node: Dict[Hashable, str] = {}
self.node_to_signature: Dict[str, Hashable] = {}
self.node_counter = 0
self.dag_counter = 0

def _compute_node_signature(
self, operation: Any, inputs: List[str], parameters: Dict[str, Any] = None
Expand Down Expand Up @@ -602,8 +608,13 @@ def create_operation_node(
operation=operation, inputs=inputs, parameters=parameters, is_leaf=False
)

def build(self, root_node_id: str) -> RepresentationDag:
dag = RepresentationDag(nodes=self.global_nodes, root_node_id=root_node_id)
def build(self, root_node_id: str, dag_id: int = None) -> RepresentationDag:
dag = RepresentationDag(
nodes=self.global_nodes,
root_node_id=root_node_id,
dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
)
self.dag_counter += 1
if not dag.validate():
raise ValueError("Invalid DAG construction")
return dag
Expand Down
30 changes: 23 additions & 7 deletions src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def __init__(
checkpoint_every: Optional[int] = 1,
resume: bool = False,
max_num_workers: int = -1,
enable_checkpointing: bool = True,
):
self.enable_checkpointing = enable_checkpointing
self.modalities = modalities
self.tasks = tasks
self.modality_ids = [modality.modality_id for modality in modalities]
Expand Down Expand Up @@ -295,7 +297,9 @@ def _expand_dags_with_task_roots(
)

task_root_dag = RepresentationDag(
nodes=[*dag.nodes, task_node], root_node_id=task_node_id
nodes=[*dag.nodes, task_node],
root_node_id=task_node_id,
dag_id=dag.dag_id,
)
expanded_dags.append(task_root_dag)
return expanded_dags
Expand Down Expand Up @@ -329,25 +333,28 @@ def _process_modality(self, modality, skip_remaining: int = 0, scheduler=None):
)

dags = self.add_aggregation_operator(self.builders[modality.modality_id], dags)
dags = pushdown_aggregation(dags)
dags_with_pushdown = pushdown_aggregation(dags)

if skip_remaining > 0:
dags = dags[skip_remaining:]

expanded_dags = self._expand_dags_with_task_roots(dags)
expanded_dags_with_task_roots = self._expand_dags_with_task_roots(
dags_with_pushdown
)

node_executor = NodeExecutor(
expanded_dags,
expanded_dags_with_task_roots,
[modality],
self.tasks,
self._checkpoint_manager,
self.max_num_workers,
self.result_path,
enable_checkpointing=self.enable_checkpointing,
)
task_results = node_executor.run()

for task_result in task_results:
local_results.add_task_result(task_result)
local_results.add_task_result(task_result, dags)

if self.save_all_results:
timestr = time.strftime("%Y%m%d-%H%M%S")
Expand Down Expand Up @@ -515,7 +522,7 @@ def add_aggregation_operator(self, builder, dags):
[dag.root_node_id],
agg_op.get_current_parameters(),
)
aggregated_dags.append(builder.build(agg_node_id))
aggregated_dags.append(builder.build(agg_node_id, dag.dag_id))
else:
aggregated_dags.append(dag)
new_dags = aggregated_dags
Expand Down Expand Up @@ -596,10 +603,12 @@ def __init__(
self.results[modality] = {task_name: [] for task_name in self.task_names}
self.cache[modality] = {task_name: [] for task_name in self.task_names}

def add_task_result(self, task_result: ResultEntry):
def add_task_result(self, task_result: ResultEntry, dags: List[RepresentationDag]):
dag_id = task_result.dag.dag_id
task_name = self.task_names[
task_result.dag.nodes[-1].parameters.get("_task_idx", 0)
]
task_result.dag = get_dag_by_id(dags, dag_id)
self.results[task_result.dag.nodes[0].modality_id][task_name].append(
task_result
)
Expand Down Expand Up @@ -716,3 +725,10 @@ def get_k_best_results(
] = cache

return results, cache


def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) -> RepresentationDag:
for dag in dags:
if dag.dag_id == dag_id:
return dag
return None
7 changes: 6 additions & 1 deletion src/main/python/systemds/scuro/modality/transformed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
# -------------------------------------------------------------
from typing import Union, List
import inspect
import numpy as np
from systemds.scuro.modality.type import ModalityType
from systemds.scuro.modality.joined import JoinedModality
Expand Down Expand Up @@ -166,7 +167,11 @@ def dimensionality_reduction(self, dimensionality_reduction_operator):

def apply_representation(self, representation, aggregation=None):
start = time.time()
new_modality = representation.transform(self, aggregation=aggregation)
transform_sig = inspect.signature(representation.transform)
if "aggregation" in transform_sig.parameters:
new_modality = representation.transform(self, aggregation=aggregation)
else:
new_modality = representation.transform(self)
new_modality.update_metadata()
new_modality.transform_time += time.time() - start
new_modality.self_contained = representation.self_contained
Expand Down
Loading
Loading