diff --git a/.airstack/modules/osmo.sh b/.airstack/modules/osmo.sh index 053decbee..a10a47809 100755 --- a/.airstack/modules/osmo.sh +++ b/.airstack/modules/osmo.sh @@ -686,6 +686,199 @@ function cmd_osmo_foxglove { --connect-timeout "$OSMO_PF_TIMEOUT" } +# osmo:mission — submit airstack-mission.yaml with a mission spec selected. +# +# Usage: airstack osmo:mission [--pool POOL] [--key PATH] +# [--branch BRANCH] [--no-keep-alive] +# +# is a repo-relative path (e.g. osmo/missions/example_takeoff_land.yaml). +# The pod clones the branch and runs the mission spec from that clone, so the +# mission file must be committed and pushed. --no-keep-alive makes the task +# exit when the mission ends (frees the GPU, triggers the workflow's +# `outputs:` upload) instead of sleeping for `airstack osmo:fetch`. +function cmd_osmo_mission { + _osmo_check_cli || return 1 + + local mission="" + local pool="${OSMO_POOL:-}" + local pubkey_file="" + local branch="" + local branch_explicit=false + local keep_alive="true" + local extra_args=() + + while [ $# -gt 0 ]; do + case "$1" in + --pool) pool="$2"; shift 2 ;; + --key) pubkey_file="$2"; shift 2 ;; + --branch) branch="$2"; branch_explicit=true; shift 2 ;; + --no-keep-alive) keep_alive="false"; shift ;; + -*) extra_args+=("$1"); shift ;; + *) + if [ -z "$mission" ]; then mission="$1"; else extra_args+=("$1"); fi + shift ;; + esac + done + + if [ -z "$mission" ]; then + log_error "Usage: airstack osmo:mission [--pool POOL] [--branch BRANCH] [--no-keep-alive]" + log_error "Available missions:" + ls "${PROJECT_ROOT}/osmo/missions/"*.yaml 2>/dev/null \ + | sed "s|${PROJECT_ROOT}/| |" >&2 + return 1 + fi + # Normalize to a repo-relative path — that's what the pod resolves + # against its clone of the branch. + mission="${mission#"${PROJECT_ROOT}"/}" + if [ ! -f "${PROJECT_ROOT}/${mission}" ]; then + log_error "Mission file not found locally: ${PROJECT_ROOT}/${mission}" + return 1 + fi + + if [ -z "$pubkey_file" ]; then + if ! pubkey_file="$(_osmo_pick_pubkey)"; then + log_error "No SSH public key found in ~/.ssh. Generate one with: ssh-keygen -t ed25519" + return 1 + fi + fi + + local workflow_yaml="${PROJECT_ROOT}/osmo/workflows/airstack-mission.yaml" + if [ ! -f "$workflow_yaml" ]; then + log_error "Workflow file not found: ${workflow_yaml}" + return 1 + fi + + # The pod runs the mission file from its clone of origin/, so an + # unpushed mission spec is the most common "why is it running the wrong + # thing" failure — same auto-pin + pushed check as osmo:up. + if [ "$branch_explicit" = false ] && [ -z "$branch" ]; then + branch="$(_osmo_local_branch)" + if [ -n "$branch" ]; then + log_info "Auto-detected local branch '${branch}'; pod will clone from origin/${branch} (override with --branch main)." + fi + fi + if [ -n "$branch" ]; then + _osmo_check_branch_pushed "$branch" + fi + + local cmd=(osmo workflow submit "$workflow_yaml") + if [ -n "$pool" ]; then + cmd+=(--pool "$pool") + else + log_warn "No --pool provided and OSMO_POOL is unset; using your osmo profile's default pool." + fi + # Single --set-env: the flag is variadic and a second occurrence silently + # drops the first (see cmd_osmo_up). + local env_kvs=( + "SSH_PUB_KEY=$(cat "$pubkey_file")" + "OSMO_MISSION_FILE=${mission}" + "OSMO_MISSION_KEEP_ALIVE=${keep_alive}" + ) + if [ -n "$branch" ]; then + env_kvs+=("AIRSTACK_BRANCH=${branch}") + fi + cmd+=(--set-env "${env_kvs[@]}") + if [ ${#extra_args[@]} -gt 0 ]; then + cmd+=("${extra_args[@]}") + fi + + log_info "Submitting mission '${mission}' (keep_alive=${keep_alive}): ${cmd[*]}" + local output + if ! output="$("${cmd[@]}" 2>&1)"; then + echo "$output" >&2 + log_error "osmo workflow submit failed." + return 1 + fi + echo "$output" + + local wf_id + wf_id="$(echo "$output" | awk -F'- ' '/^Workflow ID/ {print $2; exit}' | tr -d ' \r\n')" + if [ -z "$wf_id" ]; then + log_warn "Could not parse workflow id from submit output. Set it manually:" + log_warn " echo > ${OSMO_STATE_FILE}" + return 0 + fi + _osmo_save_wf_id "$wf_id" + + log_info "Next steps:" + log_info " airstack osmo:logs # follow mission progress" + log_info " airstack osmo:fetch # download bags + results (keep-alive mode)" + log_info " airstack osmo:down # cancel when done (results die with the pod!)" +} + +# osmo:fetch — download mission results (mcap bags, logs, summaries) from the +# pod to the laptop over the authenticated ssh port-forward. +# +# Usage: airstack osmo:fetch [dest-dir] +# +# Incremental and resumable (rsync): safe to run mid-mission to pull finished +# iterations while the next one flies, and again later to top up. Falls back +# to scp -r (non-incremental) if rsync isn't installed. +# +# Note: the osmo CLI also ships `osmo workflow rsync download`, which could +# replace the port-forward + ssh below; we use the ssh path because the +# sshd + port-forward channel is already validated infrastructure for this +# workflow (osmo:ide) and works uniformly across osmo CLI versions. +function cmd_osmo_fetch { + _osmo_check_cli || return 1 + local wf; wf="$(_osmo_wf_id)" || return 1 + + local dest="${1:-./osmo-results}" + local remote_path="/root/AirStack/osmo/results/" # trailing slash: rsync + # follows the symlink to + # /osmo/output/... on pods + local local_port="${OSMO_SSH_PORT%%:*}" + local ssh_opts=(-p "$local_port" -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR) + + # Reuse an existing ssh port-forward (e.g. from osmo:ide) or spawn one + # for the duration of the fetch — same pattern as osmo:ide. + local pf_pid="" + if ! nc -z localhost "$local_port" 2>/dev/null; then + log_info "osmo workflow port-forward ${wf} workspace --port ${OSMO_SSH_PORT} (for the duration of the fetch)" + osmo workflow port-forward "$wf" workspace --port "$OSMO_SSH_PORT" --connect-timeout 600 \ + > "${OSMO_STATE_DIR}/fetch-pf.log" 2>&1 & + pf_pid=$! + trap '[ -n "'"$pf_pid"'" ] && kill "'"$pf_pid"'" 2>/dev/null; trap - EXIT INT TERM' EXIT INT TERM + local waited=0 + until nc -z localhost "$local_port" 2>/dev/null; do + sleep 1; waited=$((waited+1)) + if [ "$waited" -ge 30 ]; then + log_error "Timed out waiting for port-forward on :${local_port}. Log: ${OSMO_STATE_DIR}/fetch-pf.log" + return 1 + fi + if ! kill -0 "$pf_pid" 2>/dev/null; then + log_error "port-forward exited early. Tail:" + tail -10 "${OSMO_STATE_DIR}/fetch-pf.log" >&2 + return 1 + fi + done + fi + + mkdir -p "$dest" + log_info "Fetching ${remote_path} → ${dest}" + local rc + if command -v rsync >/dev/null 2>&1; then + rsync -az --partial --info=progress2 -e "ssh ${ssh_opts[*]}" \ + "root@localhost:${remote_path}" "$dest/" + rc=$? + else + log_warn "rsync not found; falling back to scp -r (non-incremental)." + scp "${ssh_opts[@]}" -r "root@localhost:${remote_path}." "$dest/" + rc=$? + fi + + if [ -n "$pf_pid" ]; then + kill "$pf_pid" 2>/dev/null + trap - EXIT INT TERM + fi + + if [ "$rc" -ne 0 ]; then + log_error "Fetch failed (exit ${rc}). Is the workflow still running, and has the mission produced results yet?" + return 1 + fi + log_info "Done. Open any .mcap under ${dest} directly in Foxglove (Open local file)." +} + # osmo:down — cancel the active workflow. Reminds you to push first. function cmd_osmo_down { _osmo_check_cli || return 1 @@ -693,6 +886,7 @@ function cmd_osmo_down { log_warn "About to cancel workflow '${wf}'." log_warn "Anything not pushed to git in /root/AirStack inside the pod will be LOST." + log_warn "Mission results (bags/logs) on the pod are lost too — run 'airstack osmo:fetch' first." log_warn "Hit Ctrl-C in the next 5 seconds to abort." sleep 5 osmo workflow cancel "$wf" @@ -703,6 +897,8 @@ function cmd_osmo_down { function register_osmo_commands { COMMANDS["osmo:setup"]="cmd_osmo_setup" COMMANDS["osmo:up"]="cmd_osmo_up" + COMMANDS["osmo:mission"]="cmd_osmo_mission" + COMMANDS["osmo:fetch"]="cmd_osmo_fetch" COMMANDS["osmo:logs"]="cmd_osmo_logs" COMMANDS["osmo:ide"]="cmd_osmo_ide" COMMANDS["osmo:webrtc"]="cmd_osmo_webrtc" @@ -711,6 +907,8 @@ function register_osmo_commands { COMMAND_HELP["osmo:setup"]="One-time per-user OSMO credential setup (airlab-docker-registry, airlab-docker-login, airlab-nucleus)" COMMAND_HELP["osmo:up"]="Submit osmo/workflows/airstack-dev.yaml with your SSH pubkey injected (--pool POOL, --key PATH, --branch BRANCH)" + COMMAND_HELP["osmo:mission"]="Submit a batch mission (osmo/missions/*.yaml): repeated up→fly→record→down cycles (--pool POOL, --branch BRANCH, --no-keep-alive)" + COMMAND_HELP["osmo:fetch"]="Download mission results (mcap bags, logs, summaries) from the pod over ssh — incremental, safe to run mid-mission (osmo:fetch [dest-dir])" COMMAND_HELP["osmo:logs"]="Follow the workspace task logs (osmo workflow logs -t workspace -n 500; OSMO_LOGS_TASK / OSMO_LOGS_TAIL override)" COMMAND_HELP["osmo:ide"]="Port-forward sshd (2200:22) and open VS Code/Cursor on Host airstack-osmo" COMMAND_HELP["osmo:webrtc"]="Port-forward Isaac Sim WebRTC ranges (TCP foreground + UDP background)" diff --git a/.env b/.env index 82cc01ccb..412801cb6 100644 --- a/.env +++ b/.env @@ -12,7 +12,7 @@ PROJECT_NAME="airstack" # If you've run ./airstack.sh setup, then this will auto-generate from the git commit hash every time a change is made # to a Dockerfile or docker-compose.yaml file. Otherwise this can also be set explicitly to make a release version. # auto-generated from git commit hash -VERSION="0.19.0-alpha.3" +VERSION="8b927e46" # Choose "dev" or "prebuilt". "dev" is for mounted code that must be built live. "prebuilt" is for built ros_ws baked into the image DOCKER_IMAGE_BUILD_MODE="dev" # Where to push and pull images from. Can replace with your docker hub username if using docker hub. diff --git a/.gitignore b/.gitignore index 4868b5c74..624f0d4e0 100644 --- a/.gitignore +++ b/.gitignore @@ -94,6 +94,11 @@ simulation/ms-airsim/assets/scenes/* # Test results tests/results/ +# OSMO mission results: local runs of osmo/workspace/mission_runner.py write +# to osmo/results/; `airstack osmo:fetch` downloads to ./osmo-results/. +osmo/results/ +osmo-results/ + # Local-only — embedded sibling repo, not part of this branch common/rayfronts/ diff --git a/docs/tutorials/airstack_on_osmo.md b/docs/tutorials/airstack_on_osmo.md index e9cfa8974..9f093e8c0 100644 --- a/docs/tutorials/airstack_on_osmo.md +++ b/docs/tutorials/airstack_on_osmo.md @@ -547,6 +547,45 @@ osmo workflow cancel $WF +## Batch missions (unattended runs) + +Everything above is the *interactive* workflow. The same pod can instead run +**missions**: declarative YAML files (in +[`osmo/missions/`](https://github.com/castacks/AirStack/blob/main/osmo/missions/)) +that script repeated cycles of bring-up → fly → record → tear-down with no +human attached. Each iteration restarts the containers, records mcap bag +files (Foxglove's native format — open the `.mcap` directly, no conversion), +and snapshots container logs and per-step results. + +```bash +# Submit a mission (auto-pins your current branch, like osmo:up): +./airstack.sh osmo:mission osmo/missions/example_takeoff_land.yaml --pool airstack + +# Watch it fly: +./airstack.sh osmo:logs + +# Pull bags + logs + summaries to your laptop — incremental, safe to run +# mid-mission and again later to top up: +./airstack.sh osmo:fetch ./results/ + +# When you have everything (results die with the pod!): +./airstack.sh osmo:down +``` + +A mission step can be any robot task action (`takeoff`, `land`, `navigate`, +`semantic_search`, `exploration`, `coverage`, …), a timed wait, a topic pub, +a service call, or an arbitrary `ros2`/shell command. Spec schema and step +reference: +[`osmo/missions/README.md`](https://github.com/castacks/AirStack/blob/main/osmo/missions/README.md). + +By default the pod **stays alive after the mission ends** so you can +`osmo:fetch` whenever you're ready (mind the workflow's 24h `exec_timeout`). +For fire-and-forget batches, submit with `--no-keep-alive`: the pod exits +cleanly when the mission ends, freeing the GPU — and uploading the results +directory to object storage automatically if the workflow's `outputs:` +block is configured (lab-admin setup; see +[`osmo/README.md`](https://github.com/castacks/AirStack/blob/main/osmo/README.md)). + ## Troubleshooting | Symptom | Likely cause | Fix | @@ -572,7 +611,8 @@ osmo workflow cancel $WF | Uncommitted edits in the IDE | Pod-local working tree | **No** | | `colcon build` outputs (`build/`, `install/`, `log/`) | `/root/AirStack/**/ros_ws/...` | **No** (gitignored Linux x86_64 binaries; rebuild trivially) | | Inner-dockerd image cache | Pod-local Docker layer cache | **No** | -| Bag files, sim recordings, debug screenshots | `/root/AirStack/bags/`, etc. | **No** — pull selectively via `osmo workflow rsync download "$(cat ~/.airstack/osmo-state)" :` *before* tearing down | +| Mission results (mcap bags, logs, summaries) | `/root/AirStack/osmo/results/` | **No** — run `./airstack.sh osmo:fetch` *before* tearing down | +| Other bag files, sim recordings, debug screenshots | `/root/AirStack/bags/`, etc. | **No** — pull selectively via `osmo workflow rsync download "$(cat ~/.airstack/osmo-state)" :` *before* tearing down | The rule of thumb: **commit + push every time you'd save a file in a git-tracked sense.** The Source Control panel is the persistence boundary. diff --git a/osmo/README.md b/osmo/README.md index 91b41dbd5..410ed21c2 100644 --- a/osmo/README.md +++ b/osmo/README.md @@ -7,11 +7,17 @@ through [NVIDIA OSMO](https://github.com/NVIDIA/OSMO): osmo/ ├── README.md # This file (admin / operator reference) ├── workflows/ -│ └── airstack-dev.yaml # The OSMO workflow students submit +│ ├── airstack-dev.yaml # Interactive dev workflow (IDE over Remote-SSH) +│ └── airstack-mission.yaml # Batch mission workflow (unattended flights) +├── missions/ +│ ├── README.md # Mission spec schema reference +│ └── example_takeoff_land.yaml # Reference mission: takeoff → hover → land ×3 └── workspace/ ├── Dockerfile # The airstack-osmo-workspace image ├── sshd_config # Pubkey-only sshd config baked into the image - └── entrypoint.sh # Pod startup: sshd, dockerd, clone, airstack up + ├── entrypoint.sh # Pod startup: sshd, dockerd, clone, then + │ # dev mode (airstack up) or mission mode + └── mission_runner.py # Batch executor (run from the clone, not the image) ``` The student-facing walkthrough lives in @@ -21,9 +27,9 @@ README is the **lab admin / operator** reference: pool requirements, workspace image build & push, validation stages, plus a credential summary for context. -> **Scope:** developer workflow only. CI/CD on OSMO is **not** part of this -> integration — the existing `system-tests.yml` + OpenStack orchestrator path -> is unchanged. +> **Scope:** developer workflow + batch missions. CI/CD on OSMO is **not** +> part of this integration — the existing `system-tests.yml` + OpenStack +> orchestrator path is unchanged. ## Architecture in one minute @@ -47,6 +53,49 @@ app.foxglove.dev ── ws ────► port-forward 8766 ────► airstack.sh up brings these 3 up ``` +## Mission mode (batch runs) + +`airstack-mission.yaml` reuses the same workspace image and DinD pod, but +instead of one interactive `airstack up`, the entrypoint hands off to +[`workspace/mission_runner.py`](workspace/mission_runner.py), which executes +a declarative mission spec from [`missions/`](missions/) — repeated cycles of: + +``` +airstack down → airstack up → wait for PX4 ready → record mcap bags +→ run steps (takeoff / land / navigate / semantic search / any ros2 command) +→ collect bags + container logs → airstack down +``` + +Submit, monitor, and download: + +```bash +airstack osmo:mission osmo/missions/example_takeoff_land.yaml --pool +airstack osmo:logs # follow mission progress +airstack osmo:fetch ./results/ # rsync bags/logs/summaries to the laptop +airstack osmo:down # cancel (fetch first — results die with the pod) +``` + +Key behaviors: + +- **The mission spec and runner come from the clone**, not the image — what + you push on your branch is what runs. The workspace image only needs a + rebuild when `Dockerfile`, `sshd_config`, or `entrypoint.sh` change. +- **Bags are mcap** (`ros2 bag record -s mcap`) — open the `.mcap` files + directly in Foxglove, no conversion or local ROS install. +- **Results location:** `/osmo/output/airstack-mission-results///` + with a symlink at `/root/AirStack/osmo/results` (the path `osmo:fetch` + pulls). Artifacts are collected even for failed iterations. +- **`OSMO_MISSION_KEEP_ALIVE`** (default `true`): the pod sleeps after the + mission so you can fetch over ssh. Set `false` (or submit with + `osmo:mission --no-keep-alive`) for fire-and-forget: the task exits + cleanly when the mission ends, freeing the GPU — and if the workflow's + `outputs:` block is configured with a destination bucket, OSMO uploads + `/osmo/output` automatically on that exit. A **canceled** workflow does + not upload outputs, so in keep-alive mode `osmo:fetch` is the retrieval + path. + +Mission spec schema and step types: [`missions/README.md`](missions/README.md). + ## Pool requirements The OSMO pool the workflow runs on must satisfy: diff --git a/osmo/missions/README.md b/osmo/missions/README.md new file mode 100644 index 000000000..d0d95823d --- /dev/null +++ b/osmo/missions/README.md @@ -0,0 +1,146 @@ +# AirStack mission specs + +A **mission** is a declarative YAML file executed by +[`osmo/workspace/mission_runner.py`](../workspace/mission_runner.py). Each +mission runs one or more full iterations of: + +``` +airstack down → airstack up → wait for PX4 ready → start mcap recording +→ run steps → stop recording → collect bags + container logs → airstack down +``` + +Missions live in this directory so they're versioned with the code they +exercise — the OSMO pod clones your branch, so whatever you push is what runs. + +Submit to OSMO with: + +```bash +airstack osmo:mission osmo/missions/.yaml --pool +``` + +Or run locally on any machine that can `airstack up` (no OSMO involved): + +```bash +python3 osmo/workspace/mission_runner.py osmo/missions/.yaml \ + --airstack-root "$(pwd)" +``` + +`--dry-run` validates the spec and prints the merged config without touching +Docker. + +## Results + +``` +osmo/results/// +├── summary.json # per-iteration status + durations +└── iter_001/ + ├── bags/robot_1/*.mcap # open directly in Foxglove (no conversion) + ├── logs/.log # docker logs snapshot per container + ├── ready.json # per-robot seconds-to-PX4-ready + ├── steps.json # per-step command, output tail, pass/fail + └── iteration.json # iteration summary +``` + +On an OSMO pod the actual storage is `/osmo/output/airstack-mission-results` +(with `osmo/results` symlinked to it), so a workflow `outputs:` block uploads +everything automatically when the task exits. Download from your laptop at +any time while the pod is alive with `airstack osmo:fetch [dest]`. + +Artifacts are collected **even when an iteration fails** — a failed flight's +bag is usually the most interesting one. + +## Schema + +Top-level keys (everything except `steps` is optional): + +| Key | Default | Meaning | +|---|---|---| +| `name` | filename stem | Results directory name | +| `env` | `{}` | Env vars exported before each `airstack up` (`NUM_ROBOTS`, `COMPOSE_PROFILES`, `ISAAC_SIM_SCRIPT_NAME`, …) | +| `iterations` | `1` | Number of full up→fly→down cycles | +| `ready.timeout_s` | `600` | Max seconds to wait for PX4 readiness per iteration | +| `ready.poll_interval_s` | `5` | Seconds between readiness polls | +| `record.enabled` | `true` | Record an mcap per robot per iteration | +| `record.topics` | tf + odom set | Topics to record; `{robot}` → `robot_N` | +| `record.all` | `false` | Record **all** topics (`ros2 bag record -a`) — large | +| `on_step_failure` | `abort_iteration` | `continue` \| `abort_iteration` \| `abort_mission` | +| `up_timeout_s` | `3600` | `airstack up` timeout (first up on a fresh pod pulls images) | +| `down_timeout_s` | `300` | `airstack down` timeout | +| `robot_setup_bash` | robot ws `setup.bash` | Workspace sourced before `ros2` commands | +| `steps` | — | Ordered list of steps (below) | + +### Steps + +Every step type accepts the placeholders `{robot}` → `robot_N` and `{n}` → `N` +in its strings, and runs once per robot in `robots:` (`all` by default, or a +list like `[1, 3]`). + +**`action`** — send a goal to a robot task action server and wait for the +result. The step passes when the action result reports `success: true`. + +```yaml +- action: + task: takeoff # → /robot_N/tasks/ + goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} + timeout_s: 120 # default 120 + robots: all + # type: task_msgs/action/TakeoffTask # derived from task name if omitted +``` + +Available tasks (action type is derived as `task_msgs/action/Task`): +`takeoff`, `land`, `fixed_trajectory`, `navigate`, `exploration`, `coverage`, +`semantic_search`, `chat`. Goal fields are defined in +[`common/ros_packages/msgs/task_msgs/action/`](../../common/ros_packages/msgs/task_msgs/action/). +Multi-robot action goals are sent **in parallel** across robots. + +**`wait`** — sleep for N seconds (e.g. hover, let a planner run): + +```yaml +- wait: 30 +``` + +**`run`** — arbitrary command; the escape hatch that makes any ROS 2 command +work without runner changes. The step fails on non-zero exit unless +`expect_success: false`. + +```yaml +- run: + container: robot_1 # robot_N → exec in the robot container on robot + # N's DDS domain (ros2 is sourced for you); + # pod → run on the pod itself (cwd = AirStack root); + # any other value → literal container name, domain 0 + cmd: ros2 topic echo --once /{robot}/odometry + timeout_s: 60 + expect_success: true +``` + +**`topic_pub`** — `ros2 topic pub --once` per robot: + +```yaml +- topic_pub: + topic: /{robot}/some_input + type: std_msgs/msg/Bool + msg: {data: true} +``` + +**`service_call`** — `ros2 service call` per robot: + +```yaml +- service_call: + service: /{robot}/some_service + type: std_srvs/srv/Trigger + request: {} +``` + +## Notes + +- For `NUM_ROBOTS > 1` on Isaac Sim, set + `ISAAC_SIM_SCRIPT_NAME: example_multi_px4_pegasus_launch_script.py` in + `env:` — the default script spawns a single drone. +- Missions run unattended: keep `ISAAC_SIM_HEADLESS: "true"`, + `ISAAC_SIM_USE_STANDALONE: "true"` and `PLAY_SIM_ON_START: "true"` unless + you're watching via the WebRTC livestream profile. +- Recording camera/LiDAR topics (or `record.all: true`) is the bag-size + driver — budget pod `storage:` accordingly. +- `/tf` and `/tf_static` are in the default topic set because without them a + Foxglove 3D panel can't pose anything during replay. diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml new file mode 100644 index 000000000..1eaa96554 --- /dev/null +++ b/osmo/missions/example_takeoff_land.yaml @@ -0,0 +1,110 @@ +# Example mission: takeoff → hover 30s → land, 3 times, Isaac Sim headless. +# +# Run on OSMO: +# airstack osmo:mission osmo/missions/example_takeoff_land.yaml --pool +# +# Or locally against a machine that can run `airstack up` (no OSMO needed): +# python3 osmo/workspace/mission_runner.py osmo/missions/example_takeoff_land.yaml \ +# --airstack-root "$(pwd)" +# +# Schema reference: osmo/missions/README.md + +name: example_takeoff_land + +# How to bring the stack up. `services` are passed straight through: +# ./airstack.sh up isaac-sim robot-desktop gcs +# (docker compose auto-enables a named service's profile, so no +# COMPOSE_PROFILES needed). On OSMO swap isaac-sim → isaac-sim-livestream to +# watch the run over WebRTC. Alternatively use `profiles:` to export +# COMPOSE_PROFILES and run a plain `airstack up`. +stack: + services: [isaac-sim-livestream, robot-desktop, gcs] + # profiles: [desktop, isaac-sim] + +# Exported before every `airstack up`. Anything the compose stack reads from +# the environment can go here (.env values, ISAAC_SIM_SCRIPT_NAME, etc.). +env: + NUM_ROBOTS: 1 + ISAAC_SIM_HEADLESS: "true" + ISAAC_SIM_USE_STANDALONE: "true" + # The default script only spawns one drone; for NUM_ROBOTS > 1 switch to + # example_multi_px4_pegasus_launch_script.py. + ISAAC_SIM_SCRIPT_NAME: example_one_px4_pegasus_launch_script.py + # Missions are unattended — the sim must start playing without a GUI click. + PLAY_SIM_ON_START: "true" + +# Where `action` steps are sent by default: +# gcs — through the GCS action_relay, the same String-JSON → typed-goal +# path Foxglove and the GCS panels use (exercises the full +# GCS→robot chain, including the relay's airborne preconditions) +# robot — `ros2 action send_goal` directly on the robot's DDS domain +# Override per step with `via:`. +command_route: gcs + +# Full up → fly → collect → down cycles. +iterations: 3 + +# Gate before any steps run: per robot, MAVROS connected, then +# local_position/odom publishing (PX4 EKF converged = ready to arm). +ready: + timeout_s: 600 + +# One mcap per iteration with ALL robots' data, recorded on the GCS +# (domain 0) where each robot's domain_bridge forwards its state topics — +# replaying the .mcap in Foxglove reproduces the live GCS view. +# {robot} expands to robot_1..robot_N and the list is unioned. +record: + enabled: true + scope: gcs # or `robot`: one mcap per robot on its own + # domain, for topics not bridged to the GCS + topics: + - /tf + - /tf_static + - /{robot}/odometry_conversion/odometry + - /{robot}/interface/mavros/global_position/global + - /{robot}/trajectory_controller/trajectory_vis + - /{robot}/global_plan + - /gcs/robot_markers + - /gcs/map_origin/location + - /gcs/map_origin/ground_msl + - /gcs/{robot}/location + # Or record everything visible on the recording domain (large): + # all: true + +# continue | abort_iteration (default) | abort_mission +on_step_failure: abort_iteration + +# Steps run top to bottom. Every step that addresses a robot fans out over a +# robot selection — `robots:` (default `all` = robots 1..NUM_ROBOTS, or a +# list like [1, 3]) — and {robot} → robot_N / {n} → N expand per robot. +# Per-step knobs (all optional, shown here with their defaults): +# action: robots=all via= timeout_s=120 +# via: gcs (through the GCS action_relay) | robot (direct send_goal) +# run: robots=all container=robot_{n} (per-robot) or robot_1 | gcs | pod | +# ; fans out when cmd/container references {robot}/{n} +# wait: (runs once) +steps: + - action: + task: takeoff # → //tasks/takeoff (TakeoffTask) + goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} + #to specify robot(s) + #robots: [1] (1st robot) + #robots: [1, 3] (1st and 3rd robot) + #robots: all (all robots) + + - wait: 30 # hover + + - run: # arbitrary command, that runs per robot + cmd: ros2 topic echo --once /{robot}/odometry_conversion/odometry + timeout_s: 20 + #to specify robot(s): + #robots: [1] (1st robot) + #robots: [1, 3] (1st and 3rd robot) + + #To specify the container: + #container: gcs (GCS container) + #container: + + - action: + task: land # → //tasks/land (LandTask) + goal: {velocity_m_s: 0.5} diff --git a/osmo/workflows/airstack-mission.yaml b/osmo/workflows/airstack-mission.yaml new file mode 100644 index 000000000..7b7498452 --- /dev/null +++ b/osmo/workflows/airstack-mission.yaml @@ -0,0 +1,123 @@ +# AirStack batch mission workflow on OSMO. +# +# Same workspace image and pod layout as airstack-dev.yaml, but instead of a +# single `airstack up` for an interactive session, this runs a batch mission: +# osmo/workspace/mission_runner.py executes the mission spec named by +# OSMO_MISSION_FILE — repeated cycles of +# +# airstack down → airstack up → wait for PX4 ready → record mcap bags +# → run steps (takeoff / land / navigate / any ros2 command) → collect +# bags + logs → airstack down +# +# NO IMAGE REBUILD NEEDED. The mission engine is NOT baked into the image: +# the command below reuses the baked entrypoint for pod setup only +# (OSMO_AIRSTACK_UP=false → sshd, inner dockerd, branch clone, creds, registry +# login — no `airstack up`), then hands off to mission_launcher.sh FROM THE +# CLONE. So `git push` your branch and resubmit; the launcher + runner + spec +# all come from the branch. (The launcher pip-installs PyYAML at runtime if +# the image predates it.) +# +# Mission specs live in osmo/missions/ (cloned with the branch, so what you +# push is what runs). Schema: osmo/missions/README.md. +# +# To submit (or use the wrapper: airstack osmo:mission ): +# +# osmo workflow submit osmo/workflows/airstack-mission.yaml \ +# --pool \ +# --set-env "SSH_PUB_KEY=$(cat ~/.ssh/id_ed25519.pub)" \ +# "OSMO_MISSION_FILE=osmo/missions/example_takeoff_land.yaml" \ +# "AIRSTACK_BRANCH=" +# +# Results land in /osmo/output/airstack-mission-results (symlinked from +# /root/AirStack/osmo/results). With the default OSMO_MISSION_KEEP_ALIVE=true +# the pod stays alive after the mission so you can pull artifacts from your +# laptop: +# +# airstack osmo:fetch ./results/ +# +# Set OSMO_MISSION_KEEP_ALIVE=false for fire-and-forget runs: the task exits +# cleanly when the mission ends, which frees the GPU and triggers the +# `outputs:` upload below (if a destination bucket is configured). + +workflow: + name: airstack-mission + groups: + - name: airstack + tasks: + - name: workspace + lead: true + image: airlab-docker.andrew.cmu.edu/airstack/airstack-osmo-workspace:latest + # Required so the inner dockerd can run (same DinD setup as airstack-dev). + privileged: true + # Bootstrap: run the baked entrypoint for pod setup ONLY + # (OSMO_AIRSTACK_UP=false), backgrounded, then hand off to the launcher + # that ships in the freshly-cloned branch. This is what makes the + # mission engine rebuild-free — see the header comment. + command: ["bash", "-c"] + args: + - | + set -uo pipefail + log() { echo "[mission-bootstrap] $*"; } + log "starting pod setup via baked entrypoint (OSMO_AIRSTACK_UP=false)" + # Tee the baked entrypoint's output so the launcher can wait for its + # terminal "sleeping forever" line (= setup + its own `airstack up` + # finished) before starting the mission — see mission_launcher.sh. + # The pipe keeps its logs in `osmo workflow logs` too. + OSMO_AIRSTACK_UP=false /usr/local/bin/entrypoint.sh 2>&1 \ + | tee /tmp/baked-entrypoint.log & + LAUNCHER=/root/AirStack/osmo/workspace/mission_launcher.sh + log "waiting for branch clone to provide $LAUNCHER" + for i in $(seq 1 600); do [ -f "$LAUNCHER" ] && break; sleep 2; done + if [ ! -f "$LAUNCHER" ]; then + log "ERROR: $LAUNCHER not found after clone wait; sleeping for SSH debug" + exec sleep infinity + fi + log "handing off to mission_launcher.sh" + exec bash "$LAUNCHER" + environment: + # Mission selection — repo-relative path into the clone. Override at + # submit time to run a different mission. + OSMO_MISSION_FILE: "osmo/missions/example_takeoff_land.yaml" + # true → pod sleeps after the mission; fetch results over ssh, then + # `airstack osmo:down` (results die with the pod!). + # false → task exits when the mission ends: GPU freed, /osmo/output + # uploaded to `outputs:` destinations. + OSMO_MISSION_KEEP_ALIVE: "true" + AIRSTACK_BRANCH: "krrishj18/osmo-mission-runner" # branch entrypoint.sh clones + AIRSTACK_REPO_URL: "https://github.com/castacks/AirStack.git" + # SSH_PUB_KEY is supplied at submit time (needed for osmo:fetch): + # --set-env "SSH_PUB_KEY=$(cat ~/.ssh/id_ed25519.pub)" + # Uncomment and point at a bucket the OSMO deployment can write to, to + # have /osmo/output uploaded automatically when the task exits (only + # meaningful with OSMO_MISSION_KEEP_ALIVE=false — a canceled workflow + # does not upload outputs): + # + # outputs: + # - url: s3:///airstack-missions/{{workflow_id}}/ + credentials: + # Same per-user credentials as airstack-dev.yaml — see + # docs/tutorials/airstack_on_osmo.md "Step 0" / `airstack osmo:setup`. + airlab-nucleus: + OMNI_USER: omni_user + OMNI_PASS: omni_pass + OMNI_SERVER: omni_server + airlab-docker-login: + AIRLAB_REGISTRY_USER: username + AIRLAB_REGISTRY_PASS: password + + resources: + default: + cpu: 16 + gpu: 1 + memory: 64Gi + # Same sizing rationale as airstack-dev.yaml (inner image set alone + # exceeds 100Gi extracted), plus headroom for per-iteration mcap + # recordings — bump if a mission records camera/LiDAR over many + # iterations. + storage: 500Gi + + timeout: + # Missions run unattended and multi-iteration; 24h leaves room for long + # batches plus result download time in keep-alive mode. The pod is gone + # when this expires — fetch results first. + exec_timeout: 24h diff --git a/osmo/workspace/Dockerfile b/osmo/workspace/Dockerfile index e80f3be59..0e10d9e49 100644 --- a/osmo/workspace/Dockerfile +++ b/osmo/workspace/Dockerfile @@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ procps \ python3 \ python3-pip \ + python3-yaml \ rsync \ sudo \ tmux \ diff --git a/osmo/workspace/entrypoint.sh b/osmo/workspace/entrypoint.sh index bef9d9ba8..dd53a94ae 100755 --- a/osmo/workspace/entrypoint.sh +++ b/osmo/workspace/entrypoint.sh @@ -259,12 +259,17 @@ else export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim}" fi +# OSMO_AIRSTACK_UP=false skips `airstack up` and goes straight to sleep — the +# pod is fully set up (sshd, dockerd, clone, creds) for the caller to drive. +# The airstack-mission.yaml workflow uses this hook: it runs this entrypoint +# for setup only, then hands off to osmo/workspace/mission_launcher.sh (from +# the clone, so the mission engine ships with the branch — no image rebuild). cd "$AIRSTACK_ROOT" if [ "${OSMO_AIRSTACK_UP:-true}" = "true" ]; then log "airstack up (COMPOSE_PROFILES=$COMPOSE_PROFILES, NUM_ROBOTS=$NUM_ROBOTS, livestream=$ISAAC_SIM_LIVESTREAM)" ./airstack.sh up || log "WARN: airstack up exited non-zero — pod stays alive for debugging via SSH" else - log "OSMO_AIRSTACK_UP=false — skipping airstack up; SSH in and run ./airstack.sh up manually" + log "OSMO_AIRSTACK_UP=false — skipping airstack up; caller drives the stack" fi # ─── 7. Sleep ────────────────────────────────────────────────────────────── diff --git a/osmo/workspace/mission_launcher.sh b/osmo/workspace/mission_launcher.sh new file mode 100755 index 000000000..e7f757123 --- /dev/null +++ b/osmo/workspace/mission_launcher.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash +# mission_launcher.sh — no-rebuild mission entrypoint for AirStack-on-OSMO. +# +# The airstack-mission.yaml workflow runs the *baked* image entrypoint with +# OSMO_AIRSTACK_UP=false (pod setup only: sshd, inner dockerd, branch clone, +# Nucleus creds, registry login), then hands off to THIS script — which lives +# in the clone, so the mission engine always comes from your branch and no +# workspace-image rebuild is needed to iterate. (PyYAML, which the baked image +# may predate, is installed at runtime below.) +# +# Responsibilities: +# 1. Wait for the setup the baked entrypoint performs in the background +# (inner dockerd ready, clone present, registry login done). +# 2. Ensure PyYAML is importable. +# 3. Run mission_runner.py against $OSMO_MISSION_FILE. +# 4. Keep-alive: stay foreground (sleep) so the pod survives for +# `airstack osmo:fetch`, or exit so OSMO uploads /osmo/output + frees GPU. + +set -uo pipefail + +AIRSTACK_ROOT="${AIRSTACK_ROOT:-/root/AirStack}" + +log() { echo "[mission-launcher] $*"; } +fail() { echo "[mission-launcher] ERROR: $*" >&2; } + +# wait_for — poll until the command +# succeeds or the timeout elapses. Returns non-zero on timeout. +wait_for() { + local desc="$1" timeout="$2"; shift 2 + local i=0 + until "$@" >/dev/null 2>&1; do + i=$((i + 1)) + if [ "$i" -ge "$timeout" ]; then + fail "timed out after ${timeout}s waiting for: ${desc}" + return 1 + fi + [ $((i % 15)) -eq 0 ] && log "still waiting for ${desc} (${i}s)" + sleep 1 + done + log "ready: ${desc}" +} + +# ── 1. setup readiness (driven by the backgrounded baked entrypoint) ─────── +# If dockerd or the clone never appear, the pod is broken — stay alive (when +# keep-alive) so it can be inspected over SSH rather than vanishing. +if ! wait_for "inner dockerd" 180 docker info; then + [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ] && exec sleep infinity + exit 1 +fi +if ! wait_for "branch clone" 600 test -f "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py"; then + [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ] && exec sleep infinity + exit 1 +fi + +# The deployed :latest image's baked entrypoint ignores OSMO_AIRSTACK_UP and +# runs its OWN `airstack up`. Running the mission's bring-up concurrently +# collides (duplicate network, container-name conflicts, "network not found" +# mid-teardown) and fails the first iteration. So wait for the baked +# entrypoint to FINISH — the bootstrap tees its output to $BAKED_LOG, and its +# terminal "sleeping forever" line is printed exactly once, after its +# `airstack up` completes (success or fail). This also subsumes the clone + +# registry-login steps it performs, and warms the inner image cache the +# mission reuses. On a rebuilt image where the hook works, the entrypoint +# skips `up` and prints that line almost immediately, so this is a fast no-op. +# +# We deliberately do NOT detect completion via `pgrep -f "sleep infinity"`: +# the AirStack containers themselves run `sleep infinity`, so that matches the +# instant the baked bring-up starts a container — firing the handoff in the +# middle of its `airstack up`, which is exactly the race we're avoiding. +BAKED_LOG="${BAKED_ENTRYPOINT_LOG:-/tmp/baked-entrypoint.log}" +log "waiting for the baked entrypoint to finish its own bring-up (avoids a concurrent 'airstack up')" +if ! wait_for "baked entrypoint complete" 2400 grep -q "sleeping forever" "$BAKED_LOG"; then + log "WARN: baked entrypoint didn't signal completion in 40m — proceeding; first iteration may race its bring-up" +fi +sleep 5 # let baked's compose settle before the mission's first ensure_down/up + +# ── 2. PyYAML (mission_runner imports yaml) ──────────────────────────────── +if ! python3 -c "import yaml" >/dev/null 2>&1; then + log "installing PyYAML (image predates python3-yaml)" + pip3 install --break-system-packages --quiet pyyaml \ + || { apt-get update -qq && apt-get install -y -qq python3-yaml; } \ + || fail "could not install PyYAML — mission_runner will fail to import yaml" +fi + +# ── 3. run the mission ───────────────────────────────────────────────────── +MISSION="${OSMO_MISSION_FILE:-}" +if [ -z "$MISSION" ]; then + fail "OSMO_MISSION_FILE not set — nothing to run" +else + MISSION_PATH="$AIRSTACK_ROOT/$MISSION" + [ -f "$MISSION_PATH" ] || MISSION_PATH="$MISSION" # allow an absolute path + if [ -f "$MISSION_PATH" ]; then + log "running mission: $MISSION_PATH" + python3 "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py" "$MISSION_PATH" \ + --airstack-root "$AIRSTACK_ROOT" + log "mission_runner exited $?" + else + fail "mission file not found: $MISSION (looked under the clone and as an absolute path)" + fi +fi + +# ── 4. lifetime ──────────────────────────────────────────────────────────── +if [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ]; then + log "OSMO_MISSION_KEEP_ALIVE=true — pod stays alive; fetch with 'airstack osmo:fetch'" + exec sleep infinity +fi +log "OSMO_MISSION_KEEP_ALIVE=false — exiting so OSMO uploads /osmo/output and frees the GPU" +exit 0 diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py new file mode 100644 index 000000000..ec662fc28 --- /dev/null +++ b/osmo/workspace/mission_runner.py @@ -0,0 +1,863 @@ +#!/usr/bin/env python3 +"""mission_runner.py — batch mission executor for AirStack-on-OSMO pods. + +Reads a declarative mission spec (YAML, see osmo/missions/README.md) and runs +N full iterations of: + + airstack down → airstack up → wait for PX4 ready → start mcap recording + → execute steps (ros2 action goals / topic pubs / service calls / raw + commands / waits) → stop recording → collect bags + container logs → + airstack down + +Artifacts land under one results root per mission run: + + /// + ├── summary.json # per-iteration pass/fail + durations + └── iter_001/ + ├── bags/robot_1/*.mcap # Foxglove-ready (open the .mcap directly) + ├── logs/.log # docker logs snapshot, per container + ├── ready.json # per-robot PX4 readiness timings + └── steps.json # per-step command, output tail, status + +The results root prefers /osmo/output (OSMO uploads that directory to the +workflow's `outputs:` destinations when the task exits) and falls back to +/osmo/results. Either way a symlink is left at +/osmo/results so `airstack osmo:fetch` always finds it. + +Designed to run on the OSMO workspace pod (invoked by entrypoint.sh when +OSMO_MISSION_FILE is set), but has no OSMO dependency: it only needs docker, +python3 + PyYAML, and a checkout with airstack.sh — so it can be tested on +any dev machine that can run `airstack up`. + +Command patterns (ros2 exec into the robot container with a per-robot +ROS_DOMAIN_ID, two-gate PX4 readiness, action-result parsing) mirror +tests/conftest.py and tests/system/test_takeoff_hover_land.py. +""" + +import argparse +import json +import os +import shlex +import subprocess +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path + +import yaml + +ROS_DISTRO_SETUP = "/opt/ros/jazzy/setup.bash" +DEFAULT_ROBOT_SETUP_BASH = "/root/AirStack/robot/ros_ws/install/setup.bash" +GCS_SETUP_BASH = "/root/AirStack/gcs/ros_ws/install/setup.bash" +ROBOT_CONTAINER_PATTERN = "robot.*desktop" + +# Default topics for `record.scope: gcs` — one recorder in the GCS container +# on domain 0, where each robot's domain_bridge (autonomy_bringup +# onboard_all/config/domain_bridge.yaml) forwards its state topics. All +# robots land in ONE mcap, and replaying it in Foxglove reproduces the live +# GCS view (gcs_visualizer markers are already in the global 'map' frame). +# {robot} expands to robot_1..robot_N and the result is unioned. +DEFAULT_GCS_RECORD_TOPICS = [ + "/tf", + "/tf_static", + "/{robot}/odometry_conversion/odometry", + "/{robot}/interface/mavros/global_position/global", + "/{robot}/trajectory_controller/trajectory_vis", + "/{robot}/global_plan", + "/gcs/robot_markers", + "/gcs/map_origin/location", + "/gcs/map_origin/ground_msl", + "/gcs/{robot}/location", +] + +# Default topics for `record.scope: robot` — one recorder (and one mcap) per +# robot on its own DDS domain. Use this scope for topics that are NOT +# bridged to the GCS (raw sensors, high-rate local topics). +DEFAULT_ROBOT_RECORD_TOPICS = [ + "/tf", + "/tf_static", + "/{robot}/odometry_conversion/odometry", + "/{robot}/interface/mavros/global_position/global", + "/{robot}/global_plan", +] + +# In-container staging dir for bag recordings (docker cp'd out before the +# stack goes down). +BAG_STAGING_DIR = "/tmp/osmo_bags" + +# Tasks the GCS action_relay bridges (gcs/ros_ws/src/action_relay). Goals for +# these can be routed `via: gcs` — published as String JSON on +# //tasks//goal (GCS domain 0), exactly the path Foxglove uses. +GCS_RELAY_TASKS = {"takeoff", "land", "navigate", "fixed_trajectory", + "semantic_search", "exploration"} + +MISSION_DEFAULTS = { + "iterations": 1, + "on_step_failure": "abort_iteration", # continue | abort_iteration | abort_mission + "ready": {"timeout_s": 600, "poll_interval_s": 5}, + # scope "gcs": one recorder on GCS domain 0 → one mcap with every robot's + # bridged topics (default). scope "robot": one recorder + one mcap per + # robot on its own domain (for unbridged/high-rate topics). + "record": {"enabled": True, "scope": "gcs"}, + # How the stack is brought up. Either (or both): + # services: [isaac-sim, robot-desktop, gcs] → ./airstack.sh up + # (compose auto-enables a named service's profile) + # profiles: [desktop, isaac-sim] → exported as COMPOSE_PROFILES + # With neither, plain `airstack up` with COMPOSE_PROFILES=desktop,isaac-sim. + "stack": {"services": [], "profiles": []}, + # Default route for `action` steps: "gcs" sends goals through the GCS + # action_relay (same path as Foxglove / the GCS panels — exercises the + # full GCS→robot chain incl. the relay's airborne preconditions); + # "robot" sends ros2 action goals directly on the robot's DDS domain. + # Override per step with `via:`. + "command_route": "gcs", + "up_timeout_s": 3600, # first `up` on a fresh pod pulls the full image set + "down_timeout_s": 300, + "robot_setup_bash": DEFAULT_ROBOT_SETUP_BASH, +} + + +def log(msg): + print(f"[mission {datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) + + +def utc_stamp(): + return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S") + + +# ── subprocess / docker plumbing ─────────────────────────────────────────── + +def sh(cmd_list, timeout, env=None, cwd=None): + """Run a command, capturing output. Never raises on non-zero exit; a + TimeoutExpired is converted into a synthetic failed result so step + failures stay data, not exceptions.""" + try: + return subprocess.run(cmd_list, capture_output=True, text=True, + timeout=timeout, env=env, cwd=cwd) + except subprocess.TimeoutExpired as e: + return subprocess.CompletedProcess( + cmd_list, returncode=124, + stdout=(e.stdout or b"").decode() if isinstance(e.stdout, bytes) else (e.stdout or ""), + stderr=f"timed out after {timeout}s", + ) + + +def docker_exec(container, cmd, timeout): + return sh(["docker", "exec", container, "bash", "-c", cmd], timeout=timeout) + + +def ros2_env_prefix(setup_bash, domain_id): + # Workspace setup is conditional: the GCS container has a different (or + # no) workspace at the robot path, and plain std_msgs pub/echo only needs + # the distro setup anyway. + return (f"source {ROS_DISTRO_SETUP} && " + f"{{ [ -f {setup_bash} ] && source {setup_bash}; }}; " + f"export ROS_DOMAIN_ID={domain_id}") + + +def ros2_exec(container, ros2_cmd, domain_id, setup_bash, timeout): + return docker_exec(container, f"{ros2_env_prefix(setup_bash, domain_id)} && {ros2_cmd}", + timeout=timeout) + + +def list_containers(name_pattern=None, all_states=False): + cmd = ["docker", "ps", "--format", "{{.Names}}"] + if all_states: + cmd.insert(2, "-a") + if name_pattern: + cmd += ["--filter", f"name={name_pattern}"] + result = sh(cmd, timeout=15) + return [n for n in result.stdout.strip().splitlines() if n] + + +def robot_containers(): + """Running robot containers sorted by replica index (replica n ↔ robot_n).""" + def index(name): + tail = name.rsplit("-", 1)[-1] + return int(tail) if tail.isdigit() else 0 + return sorted(list_containers(ROBOT_CONTAINER_PATTERN), key=index) + + +def gcs_container(): + names = list_containers("gcs") + return names[0] if names else None + + +def stack_containers(): + """Running containers belonging to the AirStack compose stack (robot, gcs, + and any sim variant — isaac-sim / isaac-sim-livestream / ms-airsim).""" + pats = ("airstack", "isaac-sim", "ms-airsim") + return [n for n in list_containers() if any(p in n for p in pats)] + + +# ── mission spec ─────────────────────────────────────────────────────────── + +def load_mission(path): + with open(path, encoding="utf-8") as f: + spec = yaml.safe_load(f) + if not isinstance(spec, dict): + raise ValueError(f"{path}: mission spec must be a YAML mapping") + if not spec.get("steps"): + raise ValueError(f"{path}: mission spec has no 'steps'") + merged = dict(MISSION_DEFAULTS) + for key, default in MISSION_DEFAULTS.items(): + if isinstance(default, dict): + merged[key] = {**default, **(spec.get(key) or {})} + merged.update({k: v for k, v in spec.items() if not isinstance(MISSION_DEFAULTS.get(k), dict)}) + merged.setdefault("name", Path(path).stem) + merged.setdefault("env", {}) + if merged["on_step_failure"] not in ("continue", "abort_iteration", "abort_mission"): + raise ValueError(f"on_step_failure must be continue|abort_iteration|abort_mission, " + f"got {merged['on_step_failure']!r}") + if merged["command_route"] not in ("gcs", "robot"): + raise ValueError(f"command_route must be gcs|robot, got {merged['command_route']!r}") + if merged["record"].get("scope", "gcs") not in ("gcs", "robot"): + raise ValueError(f"record.scope must be gcs|robot, " + f"got {merged['record'].get('scope')!r}") + for step in merged["steps"]: + action = step.get("action") if isinstance(step, dict) else None + if action: + via = action.get("via", merged["command_route"]) + if via not in ("gcs", "robot"): + raise ValueError(f"action via must be gcs|robot, got {via!r}") + if via == "gcs" and action.get("task") not in GCS_RELAY_TASKS: + raise ValueError( + f"task '{action.get('task')}' is not bridged by the GCS action_relay " + f"({', '.join(sorted(GCS_RELAY_TASKS))}); add `via: robot` to send it " + f"directly on the robot's domain") + return merged + + +def task_action_type(task): + """tasks/semantic_search → task_msgs/action/SemanticSearchTask""" + camel = "".join(part.capitalize() for part in task.split("_")) + return f"task_msgs/action/{camel}Task" + + +def expand(text, n): + """Substitute per-robot placeholders: {n} → robot index, {robot} → robot_.""" + return text.replace("{robot}", f"robot_{n}").replace("{n}", str(n)) + + +def has_placeholder(*strings): + """True if any string contains a per-robot placeholder ({robot} or {n}).""" + return any(s and ("{robot}" in s or "{n}" in s) for s in strings) + + +def step_robots(step_spec, num_robots): + robots = step_spec.get("robots", "all") + if robots == "all": + return list(range(1, num_robots + 1)) + return [int(r) for r in robots] + + +def uses_gcs_route(mission): + """True if any action step routes through the GCS action_relay.""" + for step in mission["steps"]: + action = step.get("action") if isinstance(step, dict) else None + if action and action.get("via", mission["command_route"]) == "gcs": + return True + return False + + +# ── stack lifecycle ──────────────────────────────────────────────────────── + +class Stack: + def __init__(self, airstack_root, mission): + self.root = airstack_root + self.mission = mission + self.env = os.environ.copy() + self.env.update({k: str(v) for k, v in mission["env"].items()}) + self.env.setdefault("AUTOLAUNCH", "true") + self.env.setdefault("NUM_ROBOTS", "1") + # Bring-up selection (see MISSION_DEFAULTS["stack"]): an explicit + # service list is passed to `airstack up `; explicit + # profiles become COMPOSE_PROFILES. With neither, default profiles. + self.services = [str(s) for s in mission["stack"].get("services") or []] + profiles = [str(p) for p in mission["stack"].get("profiles") or []] + if profiles: + self.env["COMPOSE_PROFILES"] = ",".join(profiles) + elif not self.services: + self.env.setdefault("COMPOSE_PROFILES", "desktop,isaac-sim") + self.num_robots = int(self.env["NUM_ROBOTS"]) + self.setup_bash = mission["robot_setup_bash"] + + def _airstack(self, verb, timeout, extra_args=()): + log(f"airstack {verb} {' '.join(extra_args)} " + f"(NUM_ROBOTS={self.env['NUM_ROBOTS']}, " + f"COMPOSE_PROFILES={self.env.get('COMPOSE_PROFILES', '')})") + return sh([str(Path(self.root) / "airstack.sh"), verb, *extra_args], + timeout=timeout, env=self.env, cwd=self.root) + + def up(self): + result = self._airstack("up", self.mission["up_timeout_s"], + extra_args=self.services) + if result.returncode != 0: + raise RuntimeError(f"airstack up failed (exit {result.returncode}):\n" + + "\n".join(result.stdout.splitlines()[-20:]) + + "\n" + "\n".join(result.stderr.splitlines()[-20:])) + deadline = time.time() + 120 + while time.time() < deadline: + containers = robot_containers() + if len(containers) >= self.num_robots: + return containers + time.sleep(3) + raise RuntimeError(f"expected {self.num_robots} robot containers, " + f"found {robot_containers()} after airstack up") + + def down(self): + result = self._airstack("down", self.mission["down_timeout_s"]) + if result.returncode != 0: + log(f"WARN: airstack down exited {result.returncode}") + + def ensure_down(self): + """`airstack status`; if any stack containers exist, `airstack down` + and BLOCK until they're actually gone before returning. + + This is the pre-`up` guard: a fresh `airstack up` must not race a + previous bring-up's containers/network (the baked entrypoint leaves a + stack up on a stale image; the prior iteration leaves one too). + Starting `up` while those still exist causes duplicate-network and + container-name conflicts.""" + self._airstack("status", 60) + existing = stack_containers() + if not existing: + return + log(f"existing stack containers found {existing} — bringing them down first") + self.down() + deadline = time.time() + self.mission["down_timeout_s"] + while time.time() < deadline: + remaining = stack_containers() + if not remaining: + log("stack fully down; safe to bring up") + return + log(f"waiting for teardown: {remaining}") + time.sleep(2) + log(f"WARN: containers still present after down: {stack_containers()} " + f"— proceeding, `up` may conflict") + + def wait_ready(self, container): + """Two sequential gates per robot (same as the system tests): + 1. mavros/state reports connected=True (MAVROS ↔ PX4 heartbeat); + 2. local_position/odom publishes (PX4 EKF converged — the actual + precondition for arming; `connected` alone fires ~25s too early). + Returns {robot_n: seconds_to_ready}; raises on timeout.""" + cfg = self.mission["ready"] + started = time.time() + deadline = started + cfg["timeout_s"] + connected, ready_at = set(), {} + pending = list(range(1, self.num_robots + 1)) + + while pending and time.time() < deadline: + for n in list(pending): + if n not in connected: + r = ros2_exec(container, + f"timeout 5 ros2 topic echo --once --csv " + f"--field connected /robot_{n}/interface/mavros/state", + domain_id=n, setup_bash=self.setup_bash, timeout=15) + if any(line.strip() == "True" for line in r.stdout.splitlines()): + connected.add(n) + else: + continue + r = ros2_exec(container, + f"timeout 5 ros2 topic echo --once " + f"/robot_{n}/interface/mavros/local_position/odom", + domain_id=n, setup_bash=self.setup_bash, timeout=15) + if r.returncode == 0 and "pose:" in r.stdout: + ready_at[n] = round(time.time() - started, 2) + pending.remove(n) + if pending: + log(f"waiting for PX4: connected={sorted(connected)} pending={pending} " + f"elapsed={time.time() - started:.0f}s") + time.sleep(cfg["poll_interval_s"]) + + if pending: + raise RuntimeError(f"robots {pending} not ready within {cfg['timeout_s']}s " + f"(connected so far: {sorted(connected)})") + log(f"PX4 ready: {ready_at}") + + # Gate 3 (only when actions route via the GCS): the per-robot + # action_relay nodes must be up on the GCS before goals are sent — + # the relay's goal subscription is volatile, so a goal published + # before the relay exists is silently lost. + if uses_gcs_route(self.mission): + while time.time() < deadline: + gcs = gcs_container() + if gcs: + r = ros2_exec(gcs, "timeout 10 ros2 node list", + domain_id=0, setup_bash=GCS_SETUP_BASH, timeout=20) + missing = [n for n in range(1, self.num_robots + 1) + if f"action_relay_robot_{n}" not in r.stdout] + if not missing: + ready_at["gcs_relay"] = round(time.time() - started, 2) + log(f"GCS action_relay ready ({ready_at['gcs_relay']}s)") + break + log(f"waiting for GCS action_relay: missing robots {missing}") + else: + log("waiting for gcs container") + time.sleep(cfg["poll_interval_s"]) + if "gcs_relay" not in ready_at: + raise RuntimeError( + f"GCS action_relay not ready within {cfg['timeout_s']}s — " + f"is the gcs service in stack.services/profiles and " + f"AUTOLAUNCH=true? (or set command_route: robot)") + return ready_at + + +# ── bag recording ────────────────────────────────────────────────────────── + +class Recorder: + """`ros2 bag record -s mcap` per the mission's record.scope. + + scope "gcs" (default): a single recorder in the GCS container on domain + 0, where every robot's domain_bridge forwards its state topics — one + mcap holds all robots, and Foxglove replay matches the live GCS view. + + scope "robot": one recorder per robot inside robot container 1, each on + that robot's DDS domain — for topics that aren't bridged to the GCS. + + Each recorder is started detached with its PID dropped to a file, and + stopped with SIGTERM so rosbag2 finalizes the mcap cleanly. (Not SIGINT: + jobs backgrounded from a non-interactive shell have SIGINT set to + SIG_IGN, so it would never be delivered; rosbag2 handles SIGTERM the + same way.) Bags are docker cp'd to the host before the stack goes down.""" + + def __init__(self, robot_container, mission, num_robots, setup_bash): + self.robot_container = robot_container + self.cfg = mission["record"] + self.scope = self.cfg.get("scope", "gcs") + self.num_robots = num_robots + self.setup_bash = setup_bash + # (container, domain_id, tag) per active recorder. + self.active = [] + + def _topic_selection(self, robots): + """Build the `ros2 bag record` topic args; `robots` is the list of + robot indices whose {robot}/{n} placeholders to expand (unioned, + order-preserving, deduplicated).""" + if self.cfg.get("all"): + return "-a" + default = (DEFAULT_GCS_RECORD_TOPICS if self.scope == "gcs" + else DEFAULT_ROBOT_RECORD_TOPICS) + topics = [] + for t in self.cfg.get("topics", default): + if "{robot}" in t or "{n}" in t: + topics.extend(expand(t, n) for n in robots) + else: + topics.append(t) + seen = set() + unique = [t for t in topics if not (t in seen or seen.add(t))] + return " ".join(shlex.quote(t) for t in unique) + + def _start_one(self, container, domain_id, tag, selection, setup_bash): + out_dir = f"{BAG_STAGING_DIR}/{tag}" + log_file = f"{BAG_STAGING_DIR}/record_{tag}.log" + pid_file = f"{BAG_STAGING_DIR}/record_{tag}.pid" + # Create the staging dir in the FOREGROUND, THEN background only the + # recorder. Folding `mkdir` into the same `... &` chain backgrounds the + # mkdir too, so the foreground pidfile write races ahead of the dir + # existing ("record_.pid: No such file or directory"). nohup + + # pidfile lets the recorder outlive this docker exec; topics that + # don't exist yet are fine (rosbag2 waits for them). + inner = ( + f"mkdir -p {BAG_STAGING_DIR} && rm -rf {out_dir}\n" + f"{ros2_env_prefix(setup_bash, domain_id)}\n" + f"nohup ros2 bag record -s mcap -o {out_dir} {selection} " + f"> {log_file} 2>&1 &\n" + f"echo $! > {pid_file}\n" + # Confirm the recorder is actually alive — a bad topic name or a + # missing mcap plugin would make it exit immediately, and a silent + # recording failure is the worst outcome for a mission. + f"sleep 2\n" + f"if kill -0 \"$(cat {pid_file})\" 2>/dev/null; then echo RECORDER_ALIVE; " + f"else echo RECORDER_DEAD; tail -n 5 {log_file} 2>/dev/null; fi" + ) + r = docker_exec(container, inner, timeout=30) + if "RECORDER_ALIVE" in r.stdout: + self.active.append((container, tag)) + n_topics = "all topics" if selection == "-a" else f"{len(selection.split())} topics" + log(f"recording [{tag}] in {container} (domain {domain_id}) " + f"→ {out_dir} ({n_topics})") + else: + log(f"WARN: recorder [{tag}] failed to start / exited immediately: " + f"{tail(r.stdout + r.stderr, 6)}") + + def start(self): + if not self.cfg.get("enabled", True): + log("recording disabled by mission spec") + return + robots = list(range(1, self.num_robots + 1)) + if self.scope == "gcs": + gcs = gcs_container() + if not gcs: + log("WARN: record.scope is 'gcs' but no gcs container is running — " + "recording skipped (bring up the gcs service or use scope: robot)") + return + self._start_one(gcs, 0, "gcs", self._topic_selection(robots), + GCS_SETUP_BASH) + else: + for n in robots: + self._start_one(self.robot_container, n, f"robot_{n}", + self._topic_selection([n]), self.setup_bash) + + def stop(self): + for container, tag in self.active: + docker_exec(container, f""" + pid=$(cat {BAG_STAGING_DIR}/record_{tag}.pid 2>/dev/null) || exit 0 + kill -TERM "$pid" 2>/dev/null || exit 0 + for i in $(seq 1 20); do + kill -0 "$pid" 2>/dev/null || exit 0 + sleep 1 + done + kill -9 "$pid" 2>/dev/null || true + """, timeout=40) + if self.active: + log(f"recorders stopped: {[tag for _, tag in self.active]}") + + def collect(self, dest_dir): + if not self.active: + return + dest_dir.mkdir(parents=True, exist_ok=True) + for container in {c for c, _ in self.active}: + r = sh(["docker", "cp", f"{container}:{BAG_STAGING_DIR}/.", str(dest_dir)], + timeout=1800) + if r.returncode != 0: + log(f"WARN: docker cp of bags from {container} failed: " + f"{r.stderr.strip()[:200]}") + self.active = [] + mcaps = list(dest_dir.rglob("*.mcap")) + log(f"collected {len(mcaps)} mcap file(s) → {dest_dir}") + + +# ── step execution ───────────────────────────────────────────────────────── + +def action_ok(stdout): + """ros2 action send_goal --feedback prints the result as YAML; + AirStack task results carry `success: true` on completion.""" + return "success: true" in stdout + + +def tail(text, lines=15): + return "\n".join((text or "").strip().splitlines()[-lines:]) + + +def _run_one(stack, robot_container, target, cmd, timeout, expect_success): + """Run one resolved `run` command. `target` is already robot-expanded: + pod → on the pod itself (cwd = AirStack root) + gcs → in the gcs container on domain 0 + robot_ → in robot container 1 on robot N's DDS domain + → literal container name (domain 0) + """ + if target == "pod": + r = sh(["bash", "-c", cmd], timeout=timeout, cwd=stack.root) + elif target == "gcs": + g = gcs_container() + if g: + r = ros2_exec(g, cmd, domain_id=0, setup_bash=GCS_SETUP_BASH, + timeout=timeout + 15) + else: + r = subprocess.CompletedProcess(cmd, returncode=1, stdout="", + stderr="no gcs container running") + elif target.startswith("robot_") and target[6:].isdigit(): + n = int(target[6:]) + r = ros2_exec(robot_container, cmd, domain_id=n, + setup_bash=stack.setup_bash, timeout=timeout + 15) + else: + r = docker_exec(target, cmd, timeout=timeout + 15) + ok = (r.returncode == 0) if expect_success else True + return {"target": target, "exit": r.returncode, "ok": ok, + "output_tail": tail(r.stdout + r.stderr)} + + +def run_step(stack, container, step_spec, step_index): + """Execute one step; returns a result dict with ok: bool.""" + record = {"index": step_index, "spec": step_spec, + "started_at": datetime.now(timezone.utc).isoformat()} + t0 = time.time() + + if "wait" in step_spec: + seconds = float(step_spec["wait"]) + log(f"step {step_index}: wait {seconds}s") + time.sleep(seconds) + record.update(type="wait", ok=True) + + elif "action" in step_spec: + spec = step_spec["action"] + task = spec["task"] + via = spec.get("via", stack.mission["command_route"]) + goal = spec.get("goal", {}) + # Normalize the goal to JSON: dicts directly; strings may be ros2-style + # YAML ("{target_altitude_m: 10}") — YAML-parse then re-dump. JSON is + # what the GCS relay requires, and it's a YAML subset so the direct + # send_goal path accepts it too. + goal_obj = yaml.safe_load(goal) if isinstance(goal, str) else goal + goal_json = json.dumps(goal_obj or {}) + timeout = float(spec.get("timeout_s", 120)) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: action {task} {goal_json} via {via} → robots {robots}") + + if via == "gcs": + gcs = gcs_container() + if not gcs: + record.update(type="action", task=task, via=via, ok=False, + error="no gcs container running — `via: gcs` needs the " + "gcs service up (or use `via: robot`)") + record["duration_s"] = round(time.time() - t0, 2) + log(f"step {step_index}: FAILED (no gcs container)") + return record + + def send(n): + # Same path as Foxglove: publish String JSON on + # //tasks//goal (GCS domain 0); the per-robot + # action_relay forwards it as a typed action goal on domain N + # and reports {"success": ..., "message": ...} on + # .../relay_result. Subscribe to the result *before* + # publishing the goal so a fast result can't be missed. + base = f"/robot_{n}/tasks/{task}" + result_file = f"/tmp/relay_result_{task}_{n}.out" + msg_yaml = json.dumps({"data": expand(goal_json, n)}) + script = ( + f"rm -f {result_file}\n" + f"( timeout {int(timeout)} ros2 topic echo --once --field data " + f"{base}/relay_result > {result_file} 2>&1 ) &\n" + f"sub=$!\n" + f"sleep 3\n" + f"ros2 topic pub --once {base}/goal std_msgs/msg/String " + f"{shlex.quote(msg_yaml)} > /dev/null\n" + f"wait $sub\n" + f"cat {result_file}" + ) + r = ros2_exec(gcs, script, domain_id=0, setup_bash=GCS_SETUP_BASH, + timeout=int(timeout + 30)) + ok = '"success": true' in r.stdout + return n, {"exit": r.returncode, "ok": ok, + "output_tail": tail(r.stdout + r.stderr)} + else: + action_type = spec.get("type", task_action_type(task)) + + def send(n): + cmd = (f"ros2 action send_goal --feedback /robot_{n}/tasks/{task} " + f"{action_type} {shlex.quote(expand(goal_json, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=int(timeout + 15)) + return n, {"exit": r.returncode, "ok": action_ok(r.stdout), + "output_tail": tail(r.stdout + r.stderr)} + + with ThreadPoolExecutor(max_workers=len(robots)) as pool: + results = dict(pool.map(send, robots)) + record.update(type="action", task=task, via=via, per_robot=results, + ok=all(v["ok"] for v in results.values())) + + elif "run" in step_spec: + spec = step_spec["run"] + cmd = spec["cmd"] + timeout = float(spec.get("timeout_s", 60)) + expect = spec.get("expect_success", True) + # `container` may reference {n}/{robot} to fan out over robots. If it's + # omitted, default to robot_{n} when the command is per-robot (has a + # placeholder) and robot_1 otherwise. A step fans out over `robots` + # (default all) iff its command or container references {n}/{robot}. + target = spec.get("container") or ("robot_{n}" if has_placeholder(cmd) else "robot_1") + if has_placeholder(cmd, target): + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: run [{target}] {cmd} → robots {robots}") + results = {} + for n in robots: + results[n] = _run_one(stack, container, expand(target, n), + expand(cmd, n), timeout, expect) + record.update(type="run", per_robot=results, + ok=all(v["ok"] for v in results.values())) + else: + log(f"step {step_index}: run [{target}] {cmd}") + res = _run_one(stack, container, target, cmd, timeout, expect) + record.update(type="run", **res) + + elif "topic_pub" in step_spec: + spec = step_spec["topic_pub"] + msg = spec.get("msg", {}) + msg_str = msg if isinstance(msg, str) else json.dumps(msg) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: topic_pub {spec['topic']} → robots {robots}") + results = {} + for n in robots: + cmd = (f"ros2 topic pub --once {expand(spec['topic'], n)} " + f"{spec['type']} {shlex.quote(expand(msg_str, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=float(spec.get("timeout_s", 30)) + 15) + results[n] = {"exit": r.returncode, "ok": r.returncode == 0, + "output_tail": tail(r.stdout + r.stderr)} + record.update(type="topic_pub", per_robot=results, + ok=all(v["ok"] for v in results.values())) + + elif "service_call" in step_spec: + spec = step_spec["service_call"] + req = spec.get("request", {}) + req_str = req if isinstance(req, str) else json.dumps(req) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: service_call {spec['service']} → robots {robots}") + results = {} + for n in robots: + cmd = (f"ros2 service call {expand(spec['service'], n)} " + f"{spec['type']} {shlex.quote(expand(req_str, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=float(spec.get("timeout_s", 30)) + 15) + results[n] = {"exit": r.returncode, "ok": r.returncode == 0, + "output_tail": tail(r.stdout + r.stderr)} + record.update(type="service_call", per_robot=results, + ok=all(v["ok"] for v in results.values())) + + else: + record.update(type="unknown", ok=False, + error=f"unrecognized step keys: {sorted(step_spec)}") + + record["duration_s"] = round(time.time() - t0, 2) + log(f"step {step_index}: {'OK' if record['ok'] else 'FAILED'} " + f"({record['duration_s']}s)") + return record + + +# ── artifacts ────────────────────────────────────────────────────────────── + +def snapshot_container_logs(dest_dir): + dest_dir.mkdir(parents=True, exist_ok=True) + for name in list_containers(name_pattern="airstack", all_states=True): + r = sh(["docker", "logs", name], timeout=120) + (dest_dir / f"{name}.log").write_text( + (r.stdout or "") + (("\n--- stderr ---\n" + r.stderr) if r.stderr else ""), + encoding="utf-8") + + +def write_json(path, data): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2, default=str) + "\n", encoding="utf-8") + + +def resolve_results_root(airstack_root): + """Prefer /osmo/output (auto-uploaded by OSMO `outputs:` when the task + exits); keep /osmo/results working in both cases so + `airstack osmo:fetch` has one stable path to rsync.""" + fallback = Path(airstack_root) / "osmo" / "results" + override = os.environ.get("OSMO_RESULTS_ROOT") + if override: + root = Path(override) + root.mkdir(parents=True, exist_ok=True) + return root + osmo_output = Path(os.environ.get("OSMO_OUTPUT_DIR", "/osmo/output")) + if osmo_output.is_dir() and os.access(osmo_output, os.W_OK): + root = osmo_output / "airstack-mission-results" + root.mkdir(parents=True, exist_ok=True) + if not fallback.exists(): + fallback.parent.mkdir(parents=True, exist_ok=True) + fallback.symlink_to(root) + return root + fallback.mkdir(parents=True, exist_ok=True) + return fallback + + +# ── main loop ────────────────────────────────────────────────────────────── + +def run_iteration(stack, mission, iter_dir): + """One full up → ready → record → steps → collect → down cycle. + Returns the iteration summary dict; never raises (failures are data).""" + summary = {"status": "passed", "steps_ok": 0, "steps_failed": 0} + recorder = None + container = None + t0 = time.time() + try: + # `airstack status`; if a stack is already up (baked entrypoint on a + # stale image, or the previous iteration), down it and wait for the + # containers to fully disappear before bringing up — otherwise the + # `up` races leftover containers/network. + stack.ensure_down() + containers = stack.up() + container = containers[0] + summary["up_duration_s"] = round(time.time() - t0, 2) + + ready_at = stack.wait_ready(container) + write_json(iter_dir / "ready.json", ready_at) + + recorder = Recorder(container, mission, stack.num_robots, stack.setup_bash) + recorder.start() + + steps = [] + for i, step_spec in enumerate(mission["steps"], start=1): + record = run_step(stack, container, step_spec, i) + steps.append(record) + if record["ok"]: + summary["steps_ok"] += 1 + continue + summary["steps_failed"] += 1 + summary["status"] = "failed" + policy = mission["on_step_failure"] + if policy == "continue": + continue + if policy == "abort_mission": + summary["abort_mission"] = True + break + write_json(iter_dir / "steps.json", steps) + + except Exception as e: + summary["status"] = "error" + summary["error"] = str(e) + log(f"ERROR: iteration aborted: {e}") + + finally: + # Artifact collection happens even on failure — a failed flight's + # bag is usually the most interesting one. + if recorder is not None: + recorder.stop() + recorder.collect(iter_dir / "bags") + if container is not None or list_containers("airstack", all_states=True): + snapshot_container_logs(iter_dir / "logs") + stack.down() + summary["duration_s"] = round(time.time() - t0, 2) + write_json(iter_dir / "iteration.json", summary) + return summary + + +def main(): + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument("mission_file", help="Path to mission spec YAML") + parser.add_argument("--airstack-root", + default=os.environ.get("AIRSTACK_ROOT", "/root/AirStack")) + parser.add_argument("--dry-run", action="store_true", + help="Validate the spec and print the plan without touching docker") + args = parser.parse_args() + + mission = load_mission(args.mission_file) + stack = Stack(args.airstack_root, mission) + + log(f"mission '{mission['name']}': {mission['iterations']} iteration(s), " + f"{len(mission['steps'])} step(s), {stack.num_robots} robot(s)") + if args.dry_run: + print(yaml.safe_dump(mission, sort_keys=False)) + return 0 + + results_root = resolve_results_root(args.airstack_root) + run_dir = results_root / mission["name"] / utc_stamp() + run_dir.mkdir(parents=True) + log(f"results → {run_dir}") + + iterations = [] + for i in range(1, mission["iterations"] + 1): + log(f"━━━ iteration {i}/{mission['iterations']} ━━━") + iter_dir = run_dir / f"iter_{i:03d}" + summary = run_iteration(stack, mission, iter_dir) + summary["iteration"] = i + iterations.append(summary) + write_json(run_dir / "summary.json", + {"mission": mission["name"], "mission_file": args.mission_file, + "iterations": iterations}) + if summary.get("abort_mission"): + log("on_step_failure=abort_mission — stopping remaining iterations") + break + + passed = sum(1 for s in iterations if s["status"] == "passed") + log(f"mission complete: {passed}/{len(iterations)} iteration(s) passed; " + f"results in {run_dir}") + return 0 if passed == len(iterations) else 1 + + +if __name__ == "__main__": + sys.exit(main())