diff --git a/CHANGELOG.md b/CHANGELOG.md index b6ab93b68d..a80eccf2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ #### :nail_care: Polish - Allow builds while watchers are running. https://github.com/rescript-lang/rescript/pull/8349 +- Rewatch: replace wave-based compile scheduler with a work-stealing DAG dispatcher ordered by critical-path priority, avoiding the per-wave stall on the slowest file. https://github.com/rescript-lang/rescript/pull/8374 #### :house: Internal diff --git a/rewatch/src/build/compile.rs b/rewatch/src/build/compile.rs index c646309ef7..2a434ac7ee 100644 --- a/rewatch/src/build/compile.rs +++ b/rewatch/src/build/compile.rs @@ -15,10 +15,13 @@ use anyhow::{Result, anyhow}; use console::style; use log::{debug, info, trace, warn}; use rayon::prelude::*; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::path::Path; use std::path::PathBuf; use std::process::Command; use std::sync::OnceLock; +use std::sync::mpsc; use std::time::SystemTime; /// Execute js-post-build command for a compiled JavaScript file. @@ -72,13 +75,185 @@ fn execute_post_build_command(cmd: &str, js_file_path: &Path, working_dir: &Path } } +/// A unit of work in the ready queue. Ordered by `priority` so that the +/// `BinaryHeap` pops the module with the longest remaining critical path first. +#[derive(Debug)] +struct WorkUnit { + priority: i64, + module_name: String, +} + +impl PartialEq for WorkUnit { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority + } +} +impl Eq for WorkUnit {} +impl PartialOrd for WorkUnit { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for WorkUnit { + fn cmp(&self, other: &Self) -> Ordering { + self.priority.cmp(&other.priority) + } +} + +/// Result of a single module compilation, sent from a worker back to the +/// dispatcher on the main thread. Mirrors the tuple shape the wave-based +/// scheduler used to return. +struct CompletionMsg { + module_name: String, + result: Result>, + interface_result: Option>>, + is_clean: bool, + is_compiled: bool, +} + +/// Compute the critical-path priority of every module in the universe: +/// `priority(m) = 1 + max(priority(d) for d in in-universe dependents of m)`. +/// Runs a reverse-topological sweep from leaves to roots. Modules stuck in a +/// cycle get priority 0, and the actual cycle is diagnosed later by +/// `dependency_cycle::find` when the dispatcher detects a stall. +fn compute_critical_path_priorities( + universe: &AHashSet, + build_state: &BuildState, +) -> AHashMap { + let mut priorities: AHashMap = AHashMap::with_capacity(universe.len()); + let mut remaining_dependents: AHashMap = AHashMap::with_capacity(universe.len()); + let mut queue: Vec = Vec::new(); + + for name in universe { + let module = build_state.get_module(name).unwrap(); + let count = module.dependents.iter().filter(|d| universe.contains(*d)).count(); + remaining_dependents.insert(name.clone(), count); + if count == 0 { + queue.push(name.clone()); + } + } + + while let Some(name) = queue.pop() { + let module = build_state.get_module(&name).unwrap(); + let max_dep_priority = module + .dependents + .iter() + .filter(|d| universe.contains(*d)) + .filter_map(|d| priorities.get(d).copied()) + .max() + .unwrap_or(0); + priorities.insert(name.clone(), max_dep_priority + 1); + + for dep in &module.deps { + if !universe.contains(dep) { + continue; + } + if let Some(count) = remaining_dependents.get_mut(dep) { + *count -= 1; + if *count == 0 { + queue.push(dep.clone()); + } + } + } + } + + for name in universe { + priorities.entry(name.clone()).or_insert(0); + } + priorities +} + +/// Run the short-circuit check or actual `bsc` invocation for a single module. +/// Invoked from worker threads inside the dispatcher scope; only reads +/// `BuildState`, never mutates it. +fn compile_one( + build_state: &BuildState, + module_name: &str, + is_dirty: bool, + warn_error_override: Option, +) -> CompletionMsg { + let module = build_state.get_module(module_name).unwrap(); + let package = build_state + .get_package(&module.package_name) + .expect("Package not found"); + + if !is_dirty { + return CompletionMsg { + module_name: module_name.to_string(), + result: Ok(None), + interface_result: Some(Ok(None)), + is_clean: true, + is_compiled: false, + }; + } + + match &module.source_type { + SourceType::MlMap(_) => { + // The mlmap is compiled during AST generation; the entry here just + // marks it compiled so its namespace members can proceed. + CompletionMsg { + module_name: package.namespace.to_suffix().unwrap(), + result: Ok(None), + interface_result: Some(Ok(None)), + is_clean: false, + is_compiled: false, + } + } + SourceType::SourceFile(source_file) => { + let cmi_path = helpers::get_compiler_asset( + package, + &package.namespace, + &source_file.implementation.path, + "cmi", + ); + let cmi_digest = helpers::compute_file_hash(Path::new(&cmi_path)); + + let interface_result = source_file.interface.as_ref().map(|iface| { + compile_file( + package, + &helpers::get_ast_path(&iface.path), + module, + true, + build_state, + warn_error_override.clone(), + ) + }); + let result = compile_file( + package, + &helpers::get_ast_path(&source_file.implementation.path), + module, + false, + build_state, + warn_error_override, + ); + let cmi_digest_after = helpers::compute_file_hash(Path::new(&cmi_path)); + + // If the cmi is byte-for-byte unchanged, downstream modules can + // short-circuit — we check both interface and implementation + // because e.g. `include MyModule` exposes implementation changes + // through the cmi even when the .resi is untouched. + let is_clean_cmi = matches!( + (cmi_digest, cmi_digest_after), + (Some(a), Some(b)) if a == b + ); + + CompletionMsg { + module_name: module_name.to_string(), + result, + interface_result, + is_clean: is_clean_cmi, + is_compiled: true, + } + } + } +} + pub fn compile( build_state: &mut BuildCommandState, show_progress: bool, inc: impl Fn() + std::marker::Sync, set_length: impl Fn(u64), ) -> anyhow::Result<(String, String, usize)> { - let mut compiled_modules = AHashSet::::new(); let dirty_modules = build_state .modules .iter() @@ -91,356 +266,313 @@ pub fn compile( }) .collect::>(); - // dirty_modules.iter().for_each(|m| println!("dirty module: {}", m)); - // println!("{} dirty modules", dirty_modules.len()); - let mut sorted_dirty_modules = dirty_modules.iter().collect::>(); - sorted_dirty_modules.sort(); - // dirty_modules.iter().for_each(|m| println!("dirty module: {}", m)); - // sorted_dirty_modules - // .iter() - // .for_each(|m| println!("dirty module: {}", m)); - - // for sure clean modules -- after checking the hash of the cmi - let mut clean_modules = AHashSet::::new(); - - // TODO: calculate the real dirty modules from the original dirty modules in each iteration - // taken into account the modules that we know are clean, so they don't propagate through the - // deps graph - // create a hashset of all clean modules from the file-hashes - let mut loop_count = 0; - let mut files_total_count = compiled_modules.len(); - let mut files_current_loop_count; - let mut compile_errors = "".to_string(); - let mut compile_warnings = "".to_string(); - let mut num_compiled_modules = 0; - let mut sorted_modules = build_state.module_names.iter().collect::>(); - sorted_modules.sort(); - - // this is the whole "compile universe" all modules that might be dirty - // we get this by expanding the dependents from the dirty modules - + // Expand the compile universe: every dirty module plus everything that + // transitively depends on it. let mut compile_universe = dirty_modules.clone(); - let mut current_step_modules = compile_universe.clone(); + let mut frontier = compile_universe.clone(); loop { let mut dependents: AHashSet = AHashSet::new(); - for dirty_module in current_step_modules.iter() { - dependents.extend(build_state.get_module(dirty_module).unwrap().dependents.clone()); + for module_name in frontier.iter() { + dependents.extend(build_state.get_module(module_name).unwrap().dependents.clone()); } - - current_step_modules = dependents + frontier = dependents .difference(&compile_universe) - .map(|s| s.to_string()) + .cloned() .collect::>(); - - compile_universe.extend(current_step_modules.to_owned()); - if current_step_modules.is_empty() { + if frontier.is_empty() { break; } + compile_universe.extend(frontier.iter().cloned()); } let compile_universe_count = compile_universe.len(); set_length(compile_universe_count as u64); - // start off with all modules that have no deps in this compile universe - let mut in_progress_modules = compile_universe + let priorities = compute_critical_path_priorities(&compile_universe, &build_state.build_state); + + // Count of not-yet-completed in-universe dependencies for each module. + // Only touched on the main thread. + let mut pending_deps: AHashMap = compile_universe .iter() - .filter(|module_name| { - let module = build_state.get_module(module_name).unwrap(); - module.deps.intersection(&compile_universe).count() == 0 + .map(|name| { + let module = build_state.get_module(name).unwrap(); + let count = module + .deps + .iter() + .filter(|d| compile_universe.contains(*d)) + .count(); + (name.clone(), count) }) - .map(|module_name| module_name.to_string()) - .collect::>(); - - loop { - files_current_loop_count = 0; - loop_count += 1; - - trace!( - "Compiled: {} out of {}. Compile loop: {}", - files_total_count, - compile_universe.len(), - loop_count, - ); - - let current_in_progres_modules = in_progress_modules.clone(); - - let results = current_in_progres_modules - .par_iter() - .filter_map(|module_name| { - let module = build_state.get_module(module_name).unwrap(); - let package = build_state - .get_package(&module.package_name) - .expect("Package not found"); - // all dependencies that we care about are compiled - if module - .deps - .intersection(&compile_universe) - .all(|dep| compiled_modules.contains(dep)) - { - if !module.compile_dirty { - // we are sure we don't have to compile this, so we can mark it as compiled and clean - return Some((module_name.to_string(), Ok(None), Some(Ok(None)), true, false)); - } - match module.source_type.to_owned() { - SourceType::MlMap(_) => { - // the mlmap needs to be compiled before the files are compiled - // in the same namespace, otherwise we get a compile error - // this is why mlmap is compiled in the AST generation stage - // compile_mlmap(&module.package, module_name, &project_root); - Some(( - package.namespace.to_suffix().unwrap(), - Ok(None), - Some(Ok(None)), - false, - false, - )) - } - SourceType::SourceFile(source_file) => { - let cmi_path = helpers::get_compiler_asset( - package, - &package.namespace, - &source_file.implementation.path, - "cmi", - ); - - let cmi_digest = helpers::compute_file_hash(Path::new(&cmi_path)); - - let package = build_state - .get_package(&module.package_name) - .expect("Package not found"); - - let interface_result = match source_file.interface.to_owned() { - Some(Interface { path, .. }) => { - let result = compile_file( - package, - &helpers::get_ast_path(&path), - module, - true, - build_state, - build_state.get_warn_error_override(), - ); - Some(result) - } - _ => None, - }; - let result = compile_file( - package, - &helpers::get_ast_path(&source_file.implementation.path), - module, - false, - build_state, - build_state.get_warn_error_override(), - ); - let cmi_digest_after = helpers::compute_file_hash(Path::new(&cmi_path)); - - // we want to compare both the hash of interface and the implementation - // compile assets to verify that nothing changed. We also need to checke the interface - // because we can include MyModule, so the modules that depend on this module might - // change when this modules interface does not change, but the implementation does - let is_clean_cmi = match (cmi_digest, cmi_digest_after) { - (Some(cmi_digest), Some(cmi_digest_after)) => { - cmi_digest.eq(&cmi_digest_after) - } + .collect(); - _ => false, - }; + let mut ready_heap: BinaryHeap = compile_universe + .iter() + .filter(|name| pending_deps[*name] == 0) + .map(|name| WorkUnit { + priority: *priorities.get(name).unwrap_or(&0), + module_name: name.clone(), + }) + .collect(); - Some(( - module_name.to_string(), - result, - interface_result, - is_clean_cmi, - true, - )) - } - } - } else { - None - } - .inspect(|_res| { + // Dirtiness propagation tracked locally: when a module's cmi changes, its + // dependents are forced dirty. This mirrors the old `compile_dirty` flag + // mutation but keeps build_state borrow-free while workers are running. + let mut dirty_set: AHashSet = dirty_modules; + + let warn_error_override = build_state.get_warn_error_override(); + let build_state_ref: &BuildState = &build_state.build_state; + + let (tx, rx) = mpsc::channel::(); + // Bound concurrency to rayon's pool size so the priority heap actually + // orders work — dumping everything into rayon's deque would defeat #2. + let capacity = rayon::current_num_threads().max(1); + + let mut completed: AHashSet = AHashSet::new(); + let mut results_buffer: Vec = Vec::with_capacity(compile_universe_count); + let mut has_errors = false; + let mut stalled = false; + + rayon::in_place_scope(|scope| { + let mut in_flight: usize = 0; + loop { + while in_flight < capacity && !has_errors { + let Some(work) = ready_heap.pop() else { break }; + let module_name = work.module_name.clone(); + let is_dirty = dirty_set.contains(&module_name); + let warn_override = warn_error_override.clone(); + let tx = tx.clone(); + let inc_ref = &inc; + in_flight += 1; + scope.spawn(move |_| { + let msg = compile_one(build_state_ref, &module_name, is_dirty, warn_override); if show_progress { - inc(); + inc_ref(); } - }) - }) - .collect::>(); - - for result in results.iter() { - let (module_name, result, interface_result, is_clean, is_compiled) = result; - in_progress_modules.remove(module_name); + // Receiver lives for the full scope, so send cannot fail + // unless the dispatcher has already hung up on purpose. + let _ = tx.send(msg); + }); + } - if *is_compiled { - num_compiled_modules += 1; + if in_flight == 0 { + if !ready_heap.is_empty() { + // Errors suppressed new spawns; nothing left to drain. + break; + } + if completed.len() < compile_universe_count && !has_errors { + stalled = true; + } + break; } - files_current_loop_count += 1; - compiled_modules.insert(module_name.to_string()); + let Ok(msg) = rx.recv() else { break }; + in_flight -= 1; - if *is_clean { - // actually add it to a list of clean modules - clean_modules.insert(module_name.to_string()); + if msg.result.is_err() || msg.interface_result.as_ref().is_some_and(|r| r.is_err()) { + has_errors = true; } - let module_dependents = build_state.get_module(module_name).unwrap().dependents.clone(); + let is_clean = msg.is_clean; + let finished_name = msg.module_name.clone(); + completed.insert(finished_name.clone()); + results_buffer.push(msg); - // if not clean -- compile modules that depend on this module - for dep in module_dependents.iter() { - // mark the reverse dep as dirty when the source is not clean - if !*is_clean { - let dep_module = build_state.modules.get_mut(dep).unwrap(); - // mark the reverse dep as dirty when the source is not clean - dep_module.compile_dirty = true; + // Look up dependents from the node the scheduler scheduled under — + // for mlmap, compile_one returns the namespace suffix, which is the + // key modules use to refer to the namespace entry. + let dependents = build_state.get_module(&finished_name).unwrap().dependents.clone(); + + for dep in &dependents { + if !compile_universe.contains(dep) { + continue; + } + if !is_clean { + dirty_set.insert(dep.clone()); } - if !compiled_modules.contains(dep) { - in_progress_modules.insert(dep.to_string()); + let count = pending_deps.get_mut(dep).unwrap(); + *count -= 1; + if *count == 0 && !completed.contains(dep) { + ready_heap.push(WorkUnit { + priority: priorities[dep], + module_name: dep.clone(), + }); } } + } + }); + // Close our sender handle so any lingering clones in already-spawned + // workers don't keep the channel open past the scope. + drop(tx); + + trace!( + "Compiled {} out of {} in the universe", + completed.len(), + compile_universe_count, + ); - let package_name = { - let module = build_state - .build_state - .modules - .get(module_name) - .ok_or(anyhow!("Module not found"))?; - module.package_name.clone() - }; + let mut compile_errors = String::new(); + let mut compile_warnings = String::new(); + let mut num_compiled_modules = 0; + + // Persist propagated dirtiness back onto build_state. Modules that were + // marked dirty (because a predecessor's cmi changed) but never scheduled + // — e.g. the first compile error aborted further dispatch — must keep + // compile_dirty = true so the next incremental build recompiles them. + // Successful recompiles in the result loop below override this back to + // false for the modules that actually ran. + for name in &dirty_set { + if let Some(module) = build_state.build_state.modules.get_mut(name) { + module.compile_dirty = true; + } + } + + // Sort by module name so the accumulated error/warning strings and the + // per-package compile.log writes are deterministic across runs, even + // though modules complete in arbitrary order. + results_buffer.sort_by(|a, b| a.module_name.cmp(&b.module_name)); + + for msg in results_buffer { + let CompletionMsg { + module_name, + result, + interface_result, + is_compiled, + .. + } = msg; + + if is_compiled { + num_compiled_modules += 1; + } - let package = build_state + let package_name = { + let module = build_state + .build_state + .modules + .get(&module_name) + .ok_or_else(|| anyhow!("Module not found"))?; + module.package_name.clone() + }; + let package = build_state + .build_state + .packages + .get(&package_name) + .ok_or_else(|| anyhow!("Package name not found"))?; + + let (compile_warning, compile_error, interface_warning, interface_error) = { + let module = build_state .build_state - .packages - .get(&package_name) - .ok_or(anyhow!("Package name not found"))?; - - // Process results and update module state - let (compile_warning, compile_error, interface_warning, interface_error) = { - let module = build_state - .build_state - .modules - .get_mut(module_name) - .ok_or(anyhow!("Module not found"))?; - - let (compile_warning, compile_error) = match module.source_type { - SourceType::MlMap(ref mut mlmap) => { - module.compile_dirty = false; - mlmap.parse_dirty = false; + .modules + .get_mut(&module_name) + .ok_or_else(|| anyhow!("Module not found"))?; + + let (compile_warning, compile_error) = match module.source_type { + SourceType::MlMap(ref mut mlmap) => { + module.compile_dirty = false; + mlmap.parse_dirty = false; + (None, None) + } + SourceType::SourceFile(ref mut source_file) => match &result { + Ok(Some(err)) => { + let warning_text = err.to_string(); + source_file.implementation.compile_state = CompileState::Warning; + source_file.implementation.compile_warnings = Some(warning_text.clone()); + (Some(warning_text), None) + } + Ok(None) => { + source_file.implementation.compile_state = CompileState::Success; + source_file.implementation.compile_warnings = None; (None, None) } - SourceType::SourceFile(ref mut source_file) => match result { - Ok(Some(err)) => { - let warning_text = err.to_string(); - source_file.implementation.compile_state = CompileState::Warning; - source_file.implementation.compile_warnings = Some(warning_text.clone()); - (Some(warning_text), None) - } - Ok(None) => { - source_file.implementation.compile_state = CompileState::Success; - source_file.implementation.compile_warnings = None; - (None, None) - } - Err(err) => { - source_file.implementation.compile_state = CompileState::Error; - source_file.implementation.compile_warnings = None; - (None, Some(err.to_string())) - } - }, - }; - - let (interface_warning, interface_error) = - if let SourceType::SourceFile(ref mut source_file) = module.source_type { - match interface_result { - Some(Ok(Some(err))) => { - let warning_text = err.to_string(); - source_file.interface.as_mut().unwrap().compile_state = CompileState::Warning; - source_file.interface.as_mut().unwrap().compile_warnings = - Some(warning_text.clone()); - (Some(warning_text), None) - } - Some(Ok(None)) => { - if let Some(interface) = source_file.interface.as_mut() { - interface.compile_state = CompileState::Success; - interface.compile_warnings = None; - } - (None, None) - } - Some(Err(err)) => { - source_file.interface.as_mut().unwrap().compile_state = CompileState::Error; - source_file.interface.as_mut().unwrap().compile_warnings = None; - (None, Some(err.to_string())) - } - _ => (None, None), + Err(err) => { + source_file.implementation.compile_state = CompileState::Error; + source_file.implementation.compile_warnings = None; + (None, Some(err.to_string())) + } + }, + }; + + let (interface_warning, interface_error) = if let SourceType::SourceFile(ref mut source_file) = + module.source_type + { + match &interface_result { + Some(Ok(Some(err))) => { + let warning_text = err.to_string(); + source_file.interface.as_mut().unwrap().compile_state = CompileState::Warning; + source_file.interface.as_mut().unwrap().compile_warnings = Some(warning_text.clone()); + (Some(warning_text), None) + } + Some(Ok(None)) => { + if let Some(interface) = source_file.interface.as_mut() { + interface.compile_state = CompileState::Success; + interface.compile_warnings = None; } - } else { (None, None) - }; - - // Update compilation timestamps for successful compilation - if result.is_ok() && interface_result.as_ref().is_none_or(|r| r.is_ok()) { - module.compile_dirty = false; - module.last_compiled_cmi = Some(SystemTime::now()); - module.last_compiled_cmt = Some(SystemTime::now()); + } + Some(Err(err)) => { + source_file.interface.as_mut().unwrap().compile_state = CompileState::Error; + source_file.interface.as_mut().unwrap().compile_warnings = None; + (None, Some(err.to_string())) + } + _ => (None, None), } - - (compile_warning, compile_error, interface_warning, interface_error) + } else { + (None, None) }; - // Handle logging outside the mutable borrow - if let Some(warning) = compile_warning { - logs::append(package, &warning); - compile_warnings.push_str(&warning); - } - if let Some(error) = compile_error { - logs::append(package, &error); - compile_errors.push_str(&error); - } - if let Some(warning) = interface_warning { - logs::append(package, &warning); - compile_warnings.push_str(&warning); - } - if let Some(error) = interface_error { - logs::append(package, &error); - compile_errors.push_str(&error); + if result.is_ok() && interface_result.as_ref().is_none_or(|r| r.is_ok()) { + module.compile_dirty = false; + module.last_compiled_cmi = Some(SystemTime::now()); + module.last_compiled_cmt = Some(SystemTime::now()); } - } - files_total_count += files_current_loop_count; + (compile_warning, compile_error, interface_warning, interface_error) + }; - if files_total_count == compile_universe_count { - break; + if let Some(warning) = compile_warning { + logs::append(package, &warning); + compile_warnings.push_str(&warning); } - if in_progress_modules.is_empty() || in_progress_modules.eq(¤t_in_progres_modules) { - // find the dependency cycle - let cycle = dependency_cycle::find( - &compile_universe - .iter() - .map(|s| (s, build_state.get_module(s).unwrap())) - .collect::>(), - ); + if let Some(error) = compile_error { + logs::append(package, &error); + compile_errors.push_str(&error); + } + if let Some(warning) = interface_warning { + logs::append(package, &warning); + compile_warnings.push_str(&warning); + } + if let Some(error) = interface_error { + logs::append(package, &error); + compile_errors.push_str(&error); + } + } - let guidance = "Possible solutions:\n- Extract shared code into a new module both depend on.\n"; - let message = format!( - "\n{}\n{}\n{}", - style("Can't continue... Found a circular dependency in your code:").red(), - dependency_cycle::format(&cycle, build_state), - guidance - ); + if stalled { + let cycle = dependency_cycle::find( + &compile_universe + .iter() + .map(|s| (s, build_state.get_module(s).unwrap())) + .collect::>(), + ); - // Append the error to the logs of all packages involved in the cycle, - // so editor tooling can surface it from .compiler.log - let mut touched_packages = AHashSet::::new(); - for module_name in cycle.iter() { - if let Some(module) = build_state.get_module(module_name) - && touched_packages.insert(module.package_name.clone()) - && let Some(package) = build_state.get_package(&module.package_name) - { - logs::append(package, &message); - } - } + let guidance = "Possible solutions:\n- Extract shared code into a new module both depend on.\n"; + let message = format!( + "\n{}\n{}\n{}", + style("Can't continue... Found a circular dependency in your code:").red(), + dependency_cycle::format(&cycle, build_state), + guidance + ); - compile_errors.push_str(&message) + let mut touched_packages = AHashSet::::new(); + for module_name in cycle.iter() { + if let Some(module) = build_state.get_module(module_name) + && touched_packages.insert(module.package_name.clone()) + && let Some(package) = build_state.get_package(&module.package_name) + { + logs::append(package, &message); + } } - if !compile_errors.is_empty() { - break; - }; + + compile_errors.push_str(&message); } // Collect warnings from modules that were not recompiled in this build