From 4bd534aa637a8866dd0ea69710af230c1044b4ad Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Wed, 10 Jun 2026 15:52:26 -0400 Subject: [PATCH 1/7] added mission_runner.py to automate running missions in osmo --- .airstack/modules/osmo.sh | 198 +++++++ .env | 2 +- .gitignore | 3 + docs/tutorials/airstack_on_osmo.md | 42 +- osmo/README.md | 59 +- osmo/missions/README.md | 146 +++++ osmo/missions/example_takeoff_land.yaml | 68 +++ osmo/workflows/airstack-mission.yaml | 92 ++++ osmo/workspace/Dockerfile | 1 + osmo/workspace/entrypoint.sh | 78 ++- osmo/workspace/mission_runner.py | 684 ++++++++++++++++++++++++ 11 files changed, 1344 insertions(+), 29 deletions(-) create mode 100644 osmo/missions/README.md create mode 100644 osmo/missions/example_takeoff_land.yaml create mode 100644 osmo/workflows/airstack-mission.yaml create mode 100644 osmo/workspace/mission_runner.py diff --git a/.airstack/modules/osmo.sh b/.airstack/modules/osmo.sh index 053decbee..a10a47809 100755 --- a/.airstack/modules/osmo.sh +++ b/.airstack/modules/osmo.sh @@ -686,6 +686,199 @@ function cmd_osmo_foxglove { --connect-timeout "$OSMO_PF_TIMEOUT" } +# osmo:mission — submit airstack-mission.yaml with a mission spec selected. +# +# Usage: airstack osmo:mission [--pool POOL] [--key PATH] +# [--branch BRANCH] [--no-keep-alive] +# +# is a repo-relative path (e.g. osmo/missions/example_takeoff_land.yaml). +# The pod clones the branch and runs the mission spec from that clone, so the +# mission file must be committed and pushed. --no-keep-alive makes the task +# exit when the mission ends (frees the GPU, triggers the workflow's +# `outputs:` upload) instead of sleeping for `airstack osmo:fetch`. +function cmd_osmo_mission { + _osmo_check_cli || return 1 + + local mission="" + local pool="${OSMO_POOL:-}" + local pubkey_file="" + local branch="" + local branch_explicit=false + local keep_alive="true" + local extra_args=() + + while [ $# -gt 0 ]; do + case "$1" in + --pool) pool="$2"; shift 2 ;; + --key) pubkey_file="$2"; shift 2 ;; + --branch) branch="$2"; branch_explicit=true; shift 2 ;; + --no-keep-alive) keep_alive="false"; shift ;; + -*) extra_args+=("$1"); shift ;; + *) + if [ -z "$mission" ]; then mission="$1"; else extra_args+=("$1"); fi + shift ;; + esac + done + + if [ -z "$mission" ]; then + log_error "Usage: airstack osmo:mission [--pool POOL] [--branch BRANCH] [--no-keep-alive]" + log_error "Available missions:" + ls "${PROJECT_ROOT}/osmo/missions/"*.yaml 2>/dev/null \ + | sed "s|${PROJECT_ROOT}/| |" >&2 + return 1 + fi + # Normalize to a repo-relative path — that's what the pod resolves + # against its clone of the branch. + mission="${mission#"${PROJECT_ROOT}"/}" + if [ ! -f "${PROJECT_ROOT}/${mission}" ]; then + log_error "Mission file not found locally: ${PROJECT_ROOT}/${mission}" + return 1 + fi + + if [ -z "$pubkey_file" ]; then + if ! pubkey_file="$(_osmo_pick_pubkey)"; then + log_error "No SSH public key found in ~/.ssh. Generate one with: ssh-keygen -t ed25519" + return 1 + fi + fi + + local workflow_yaml="${PROJECT_ROOT}/osmo/workflows/airstack-mission.yaml" + if [ ! -f "$workflow_yaml" ]; then + log_error "Workflow file not found: ${workflow_yaml}" + return 1 + fi + + # The pod runs the mission file from its clone of origin/, so an + # unpushed mission spec is the most common "why is it running the wrong + # thing" failure — same auto-pin + pushed check as osmo:up. + if [ "$branch_explicit" = false ] && [ -z "$branch" ]; then + branch="$(_osmo_local_branch)" + if [ -n "$branch" ]; then + log_info "Auto-detected local branch '${branch}'; pod will clone from origin/${branch} (override with --branch main)." + fi + fi + if [ -n "$branch" ]; then + _osmo_check_branch_pushed "$branch" + fi + + local cmd=(osmo workflow submit "$workflow_yaml") + if [ -n "$pool" ]; then + cmd+=(--pool "$pool") + else + log_warn "No --pool provided and OSMO_POOL is unset; using your osmo profile's default pool." + fi + # Single --set-env: the flag is variadic and a second occurrence silently + # drops the first (see cmd_osmo_up). + local env_kvs=( + "SSH_PUB_KEY=$(cat "$pubkey_file")" + "OSMO_MISSION_FILE=${mission}" + "OSMO_MISSION_KEEP_ALIVE=${keep_alive}" + ) + if [ -n "$branch" ]; then + env_kvs+=("AIRSTACK_BRANCH=${branch}") + fi + cmd+=(--set-env "${env_kvs[@]}") + if [ ${#extra_args[@]} -gt 0 ]; then + cmd+=("${extra_args[@]}") + fi + + log_info "Submitting mission '${mission}' (keep_alive=${keep_alive}): ${cmd[*]}" + local output + if ! output="$("${cmd[@]}" 2>&1)"; then + echo "$output" >&2 + log_error "osmo workflow submit failed." + return 1 + fi + echo "$output" + + local wf_id + wf_id="$(echo "$output" | awk -F'- ' '/^Workflow ID/ {print $2; exit}' | tr -d ' \r\n')" + if [ -z "$wf_id" ]; then + log_warn "Could not parse workflow id from submit output. Set it manually:" + log_warn " echo > ${OSMO_STATE_FILE}" + return 0 + fi + _osmo_save_wf_id "$wf_id" + + log_info "Next steps:" + log_info " airstack osmo:logs # follow mission progress" + log_info " airstack osmo:fetch # download bags + results (keep-alive mode)" + log_info " airstack osmo:down # cancel when done (results die with the pod!)" +} + +# osmo:fetch — download mission results (mcap bags, logs, summaries) from the +# pod to the laptop over the authenticated ssh port-forward. +# +# Usage: airstack osmo:fetch [dest-dir] +# +# Incremental and resumable (rsync): safe to run mid-mission to pull finished +# iterations while the next one flies, and again later to top up. Falls back +# to scp -r (non-incremental) if rsync isn't installed. +# +# Note: the osmo CLI also ships `osmo workflow rsync download`, which could +# replace the port-forward + ssh below; we use the ssh path because the +# sshd + port-forward channel is already validated infrastructure for this +# workflow (osmo:ide) and works uniformly across osmo CLI versions. +function cmd_osmo_fetch { + _osmo_check_cli || return 1 + local wf; wf="$(_osmo_wf_id)" || return 1 + + local dest="${1:-./osmo-results}" + local remote_path="/root/AirStack/osmo/results/" # trailing slash: rsync + # follows the symlink to + # /osmo/output/... on pods + local local_port="${OSMO_SSH_PORT%%:*}" + local ssh_opts=(-p "$local_port" -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR) + + # Reuse an existing ssh port-forward (e.g. from osmo:ide) or spawn one + # for the duration of the fetch — same pattern as osmo:ide. + local pf_pid="" + if ! nc -z localhost "$local_port" 2>/dev/null; then + log_info "osmo workflow port-forward ${wf} workspace --port ${OSMO_SSH_PORT} (for the duration of the fetch)" + osmo workflow port-forward "$wf" workspace --port "$OSMO_SSH_PORT" --connect-timeout 600 \ + > "${OSMO_STATE_DIR}/fetch-pf.log" 2>&1 & + pf_pid=$! + trap '[ -n "'"$pf_pid"'" ] && kill "'"$pf_pid"'" 2>/dev/null; trap - EXIT INT TERM' EXIT INT TERM + local waited=0 + until nc -z localhost "$local_port" 2>/dev/null; do + sleep 1; waited=$((waited+1)) + if [ "$waited" -ge 30 ]; then + log_error "Timed out waiting for port-forward on :${local_port}. Log: ${OSMO_STATE_DIR}/fetch-pf.log" + return 1 + fi + if ! kill -0 "$pf_pid" 2>/dev/null; then + log_error "port-forward exited early. Tail:" + tail -10 "${OSMO_STATE_DIR}/fetch-pf.log" >&2 + return 1 + fi + done + fi + + mkdir -p "$dest" + log_info "Fetching ${remote_path} → ${dest}" + local rc + if command -v rsync >/dev/null 2>&1; then + rsync -az --partial --info=progress2 -e "ssh ${ssh_opts[*]}" \ + "root@localhost:${remote_path}" "$dest/" + rc=$? + else + log_warn "rsync not found; falling back to scp -r (non-incremental)." + scp "${ssh_opts[@]}" -r "root@localhost:${remote_path}." "$dest/" + rc=$? + fi + + if [ -n "$pf_pid" ]; then + kill "$pf_pid" 2>/dev/null + trap - EXIT INT TERM + fi + + if [ "$rc" -ne 0 ]; then + log_error "Fetch failed (exit ${rc}). Is the workflow still running, and has the mission produced results yet?" + return 1 + fi + log_info "Done. Open any .mcap under ${dest} directly in Foxglove (Open local file)." +} + # osmo:down — cancel the active workflow. Reminds you to push first. function cmd_osmo_down { _osmo_check_cli || return 1 @@ -693,6 +886,7 @@ function cmd_osmo_down { log_warn "About to cancel workflow '${wf}'." log_warn "Anything not pushed to git in /root/AirStack inside the pod will be LOST." + log_warn "Mission results (bags/logs) on the pod are lost too — run 'airstack osmo:fetch' first." log_warn "Hit Ctrl-C in the next 5 seconds to abort." sleep 5 osmo workflow cancel "$wf" @@ -703,6 +897,8 @@ function cmd_osmo_down { function register_osmo_commands { COMMANDS["osmo:setup"]="cmd_osmo_setup" COMMANDS["osmo:up"]="cmd_osmo_up" + COMMANDS["osmo:mission"]="cmd_osmo_mission" + COMMANDS["osmo:fetch"]="cmd_osmo_fetch" COMMANDS["osmo:logs"]="cmd_osmo_logs" COMMANDS["osmo:ide"]="cmd_osmo_ide" COMMANDS["osmo:webrtc"]="cmd_osmo_webrtc" @@ -711,6 +907,8 @@ function register_osmo_commands { COMMAND_HELP["osmo:setup"]="One-time per-user OSMO credential setup (airlab-docker-registry, airlab-docker-login, airlab-nucleus)" COMMAND_HELP["osmo:up"]="Submit osmo/workflows/airstack-dev.yaml with your SSH pubkey injected (--pool POOL, --key PATH, --branch BRANCH)" + COMMAND_HELP["osmo:mission"]="Submit a batch mission (osmo/missions/*.yaml): repeated up→fly→record→down cycles (--pool POOL, --branch BRANCH, --no-keep-alive)" + COMMAND_HELP["osmo:fetch"]="Download mission results (mcap bags, logs, summaries) from the pod over ssh — incremental, safe to run mid-mission (osmo:fetch [dest-dir])" COMMAND_HELP["osmo:logs"]="Follow the workspace task logs (osmo workflow logs -t workspace -n 500; OSMO_LOGS_TASK / OSMO_LOGS_TAIL override)" COMMAND_HELP["osmo:ide"]="Port-forward sshd (2200:22) and open VS Code/Cursor on Host airstack-osmo" COMMAND_HELP["osmo:webrtc"]="Port-forward Isaac Sim WebRTC ranges (TCP foreground + UDP background)" diff --git a/.env b/.env index 82cc01ccb..412801cb6 100644 --- a/.env +++ b/.env @@ -12,7 +12,7 @@ PROJECT_NAME="airstack" # If you've run ./airstack.sh setup, then this will auto-generate from the git commit hash every time a change is made # to a Dockerfile or docker-compose.yaml file. Otherwise this can also be set explicitly to make a release version. # auto-generated from git commit hash -VERSION="0.19.0-alpha.3" +VERSION="8b927e46" # Choose "dev" or "prebuilt". "dev" is for mounted code that must be built live. "prebuilt" is for built ros_ws baked into the image DOCKER_IMAGE_BUILD_MODE="dev" # Where to push and pull images from. Can replace with your docker hub username if using docker hub. diff --git a/.gitignore b/.gitignore index 4868b5c74..d14138ebb 100644 --- a/.gitignore +++ b/.gitignore @@ -94,6 +94,9 @@ simulation/ms-airsim/assets/scenes/* # Test results tests/results/ +# OSMO mission results (local runs of osmo/workspace/mission_runner.py) +osmo/results/ + # Local-only — embedded sibling repo, not part of this branch common/rayfronts/ diff --git a/docs/tutorials/airstack_on_osmo.md b/docs/tutorials/airstack_on_osmo.md index e9cfa8974..9f093e8c0 100644 --- a/docs/tutorials/airstack_on_osmo.md +++ b/docs/tutorials/airstack_on_osmo.md @@ -547,6 +547,45 @@ osmo workflow cancel $WF +## Batch missions (unattended runs) + +Everything above is the *interactive* workflow. The same pod can instead run +**missions**: declarative YAML files (in +[`osmo/missions/`](https://github.com/castacks/AirStack/blob/main/osmo/missions/)) +that script repeated cycles of bring-up → fly → record → tear-down with no +human attached. Each iteration restarts the containers, records mcap bag +files (Foxglove's native format — open the `.mcap` directly, no conversion), +and snapshots container logs and per-step results. + +```bash +# Submit a mission (auto-pins your current branch, like osmo:up): +./airstack.sh osmo:mission osmo/missions/example_takeoff_land.yaml --pool airstack + +# Watch it fly: +./airstack.sh osmo:logs + +# Pull bags + logs + summaries to your laptop — incremental, safe to run +# mid-mission and again later to top up: +./airstack.sh osmo:fetch ./results/ + +# When you have everything (results die with the pod!): +./airstack.sh osmo:down +``` + +A mission step can be any robot task action (`takeoff`, `land`, `navigate`, +`semantic_search`, `exploration`, `coverage`, …), a timed wait, a topic pub, +a service call, or an arbitrary `ros2`/shell command. Spec schema and step +reference: +[`osmo/missions/README.md`](https://github.com/castacks/AirStack/blob/main/osmo/missions/README.md). + +By default the pod **stays alive after the mission ends** so you can +`osmo:fetch` whenever you're ready (mind the workflow's 24h `exec_timeout`). +For fire-and-forget batches, submit with `--no-keep-alive`: the pod exits +cleanly when the mission ends, freeing the GPU — and uploading the results +directory to object storage automatically if the workflow's `outputs:` +block is configured (lab-admin setup; see +[`osmo/README.md`](https://github.com/castacks/AirStack/blob/main/osmo/README.md)). + ## Troubleshooting | Symptom | Likely cause | Fix | @@ -572,7 +611,8 @@ osmo workflow cancel $WF | Uncommitted edits in the IDE | Pod-local working tree | **No** | | `colcon build` outputs (`build/`, `install/`, `log/`) | `/root/AirStack/**/ros_ws/...` | **No** (gitignored Linux x86_64 binaries; rebuild trivially) | | Inner-dockerd image cache | Pod-local Docker layer cache | **No** | -| Bag files, sim recordings, debug screenshots | `/root/AirStack/bags/`, etc. | **No** — pull selectively via `osmo workflow rsync download "$(cat ~/.airstack/osmo-state)" :` *before* tearing down | +| Mission results (mcap bags, logs, summaries) | `/root/AirStack/osmo/results/` | **No** — run `./airstack.sh osmo:fetch` *before* tearing down | +| Other bag files, sim recordings, debug screenshots | `/root/AirStack/bags/`, etc. | **No** — pull selectively via `osmo workflow rsync download "$(cat ~/.airstack/osmo-state)" :` *before* tearing down | The rule of thumb: **commit + push every time you'd save a file in a git-tracked sense.** The Source Control panel is the persistence boundary. diff --git a/osmo/README.md b/osmo/README.md index 91b41dbd5..410ed21c2 100644 --- a/osmo/README.md +++ b/osmo/README.md @@ -7,11 +7,17 @@ through [NVIDIA OSMO](https://github.com/NVIDIA/OSMO): osmo/ ├── README.md # This file (admin / operator reference) ├── workflows/ -│ └── airstack-dev.yaml # The OSMO workflow students submit +│ ├── airstack-dev.yaml # Interactive dev workflow (IDE over Remote-SSH) +│ └── airstack-mission.yaml # Batch mission workflow (unattended flights) +├── missions/ +│ ├── README.md # Mission spec schema reference +│ └── example_takeoff_land.yaml # Reference mission: takeoff → hover → land ×3 └── workspace/ ├── Dockerfile # The airstack-osmo-workspace image ├── sshd_config # Pubkey-only sshd config baked into the image - └── entrypoint.sh # Pod startup: sshd, dockerd, clone, airstack up + ├── entrypoint.sh # Pod startup: sshd, dockerd, clone, then + │ # dev mode (airstack up) or mission mode + └── mission_runner.py # Batch executor (run from the clone, not the image) ``` The student-facing walkthrough lives in @@ -21,9 +27,9 @@ README is the **lab admin / operator** reference: pool requirements, workspace image build & push, validation stages, plus a credential summary for context. -> **Scope:** developer workflow only. CI/CD on OSMO is **not** part of this -> integration — the existing `system-tests.yml` + OpenStack orchestrator path -> is unchanged. +> **Scope:** developer workflow + batch missions. CI/CD on OSMO is **not** +> part of this integration — the existing `system-tests.yml` + OpenStack +> orchestrator path is unchanged. ## Architecture in one minute @@ -47,6 +53,49 @@ app.foxglove.dev ── ws ────► port-forward 8766 ────► airstack.sh up brings these 3 up ``` +## Mission mode (batch runs) + +`airstack-mission.yaml` reuses the same workspace image and DinD pod, but +instead of one interactive `airstack up`, the entrypoint hands off to +[`workspace/mission_runner.py`](workspace/mission_runner.py), which executes +a declarative mission spec from [`missions/`](missions/) — repeated cycles of: + +``` +airstack down → airstack up → wait for PX4 ready → record mcap bags +→ run steps (takeoff / land / navigate / semantic search / any ros2 command) +→ collect bags + container logs → airstack down +``` + +Submit, monitor, and download: + +```bash +airstack osmo:mission osmo/missions/example_takeoff_land.yaml --pool +airstack osmo:logs # follow mission progress +airstack osmo:fetch ./results/ # rsync bags/logs/summaries to the laptop +airstack osmo:down # cancel (fetch first — results die with the pod) +``` + +Key behaviors: + +- **The mission spec and runner come from the clone**, not the image — what + you push on your branch is what runs. The workspace image only needs a + rebuild when `Dockerfile`, `sshd_config`, or `entrypoint.sh` change. +- **Bags are mcap** (`ros2 bag record -s mcap`) — open the `.mcap` files + directly in Foxglove, no conversion or local ROS install. +- **Results location:** `/osmo/output/airstack-mission-results///` + with a symlink at `/root/AirStack/osmo/results` (the path `osmo:fetch` + pulls). Artifacts are collected even for failed iterations. +- **`OSMO_MISSION_KEEP_ALIVE`** (default `true`): the pod sleeps after the + mission so you can fetch over ssh. Set `false` (or submit with + `osmo:mission --no-keep-alive`) for fire-and-forget: the task exits + cleanly when the mission ends, freeing the GPU — and if the workflow's + `outputs:` block is configured with a destination bucket, OSMO uploads + `/osmo/output` automatically on that exit. A **canceled** workflow does + not upload outputs, so in keep-alive mode `osmo:fetch` is the retrieval + path. + +Mission spec schema and step types: [`missions/README.md`](missions/README.md). + ## Pool requirements The OSMO pool the workflow runs on must satisfy: diff --git a/osmo/missions/README.md b/osmo/missions/README.md new file mode 100644 index 000000000..d0d95823d --- /dev/null +++ b/osmo/missions/README.md @@ -0,0 +1,146 @@ +# AirStack mission specs + +A **mission** is a declarative YAML file executed by +[`osmo/workspace/mission_runner.py`](../workspace/mission_runner.py). Each +mission runs one or more full iterations of: + +``` +airstack down → airstack up → wait for PX4 ready → start mcap recording +→ run steps → stop recording → collect bags + container logs → airstack down +``` + +Missions live in this directory so they're versioned with the code they +exercise — the OSMO pod clones your branch, so whatever you push is what runs. + +Submit to OSMO with: + +```bash +airstack osmo:mission osmo/missions/.yaml --pool +``` + +Or run locally on any machine that can `airstack up` (no OSMO involved): + +```bash +python3 osmo/workspace/mission_runner.py osmo/missions/.yaml \ + --airstack-root "$(pwd)" +``` + +`--dry-run` validates the spec and prints the merged config without touching +Docker. + +## Results + +``` +osmo/results/// +├── summary.json # per-iteration status + durations +└── iter_001/ + ├── bags/robot_1/*.mcap # open directly in Foxglove (no conversion) + ├── logs/.log # docker logs snapshot per container + ├── ready.json # per-robot seconds-to-PX4-ready + ├── steps.json # per-step command, output tail, pass/fail + └── iteration.json # iteration summary +``` + +On an OSMO pod the actual storage is `/osmo/output/airstack-mission-results` +(with `osmo/results` symlinked to it), so a workflow `outputs:` block uploads +everything automatically when the task exits. Download from your laptop at +any time while the pod is alive with `airstack osmo:fetch [dest]`. + +Artifacts are collected **even when an iteration fails** — a failed flight's +bag is usually the most interesting one. + +## Schema + +Top-level keys (everything except `steps` is optional): + +| Key | Default | Meaning | +|---|---|---| +| `name` | filename stem | Results directory name | +| `env` | `{}` | Env vars exported before each `airstack up` (`NUM_ROBOTS`, `COMPOSE_PROFILES`, `ISAAC_SIM_SCRIPT_NAME`, …) | +| `iterations` | `1` | Number of full up→fly→down cycles | +| `ready.timeout_s` | `600` | Max seconds to wait for PX4 readiness per iteration | +| `ready.poll_interval_s` | `5` | Seconds between readiness polls | +| `record.enabled` | `true` | Record an mcap per robot per iteration | +| `record.topics` | tf + odom set | Topics to record; `{robot}` → `robot_N` | +| `record.all` | `false` | Record **all** topics (`ros2 bag record -a`) — large | +| `on_step_failure` | `abort_iteration` | `continue` \| `abort_iteration` \| `abort_mission` | +| `up_timeout_s` | `3600` | `airstack up` timeout (first up on a fresh pod pulls images) | +| `down_timeout_s` | `300` | `airstack down` timeout | +| `robot_setup_bash` | robot ws `setup.bash` | Workspace sourced before `ros2` commands | +| `steps` | — | Ordered list of steps (below) | + +### Steps + +Every step type accepts the placeholders `{robot}` → `robot_N` and `{n}` → `N` +in its strings, and runs once per robot in `robots:` (`all` by default, or a +list like `[1, 3]`). + +**`action`** — send a goal to a robot task action server and wait for the +result. The step passes when the action result reports `success: true`. + +```yaml +- action: + task: takeoff # → /robot_N/tasks/ + goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} + timeout_s: 120 # default 120 + robots: all + # type: task_msgs/action/TakeoffTask # derived from task name if omitted +``` + +Available tasks (action type is derived as `task_msgs/action/Task`): +`takeoff`, `land`, `fixed_trajectory`, `navigate`, `exploration`, `coverage`, +`semantic_search`, `chat`. Goal fields are defined in +[`common/ros_packages/msgs/task_msgs/action/`](../../common/ros_packages/msgs/task_msgs/action/). +Multi-robot action goals are sent **in parallel** across robots. + +**`wait`** — sleep for N seconds (e.g. hover, let a planner run): + +```yaml +- wait: 30 +``` + +**`run`** — arbitrary command; the escape hatch that makes any ROS 2 command +work without runner changes. The step fails on non-zero exit unless +`expect_success: false`. + +```yaml +- run: + container: robot_1 # robot_N → exec in the robot container on robot + # N's DDS domain (ros2 is sourced for you); + # pod → run on the pod itself (cwd = AirStack root); + # any other value → literal container name, domain 0 + cmd: ros2 topic echo --once /{robot}/odometry + timeout_s: 60 + expect_success: true +``` + +**`topic_pub`** — `ros2 topic pub --once` per robot: + +```yaml +- topic_pub: + topic: /{robot}/some_input + type: std_msgs/msg/Bool + msg: {data: true} +``` + +**`service_call`** — `ros2 service call` per robot: + +```yaml +- service_call: + service: /{robot}/some_service + type: std_srvs/srv/Trigger + request: {} +``` + +## Notes + +- For `NUM_ROBOTS > 1` on Isaac Sim, set + `ISAAC_SIM_SCRIPT_NAME: example_multi_px4_pegasus_launch_script.py` in + `env:` — the default script spawns a single drone. +- Missions run unattended: keep `ISAAC_SIM_HEADLESS: "true"`, + `ISAAC_SIM_USE_STANDALONE: "true"` and `PLAY_SIM_ON_START: "true"` unless + you're watching via the WebRTC livestream profile. +- Recording camera/LiDAR topics (or `record.all: true`) is the bag-size + driver — budget pod `storage:` accordingly. +- `/tf` and `/tf_static` are in the default topic set because without them a + Foxglove 3D panel can't pose anything during replay. diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml new file mode 100644 index 000000000..537e95b37 --- /dev/null +++ b/osmo/missions/example_takeoff_land.yaml @@ -0,0 +1,68 @@ +# Example mission: takeoff → hover 30s → land, 3 times, Isaac Sim headless. +# +# Run on OSMO: +# airstack osmo:mission osmo/missions/example_takeoff_land.yaml --pool +# +# Or locally against a machine that can run `airstack up` (no OSMO needed): +# python3 osmo/workspace/mission_runner.py osmo/missions/example_takeoff_land.yaml \ +# --airstack-root "$(pwd)" +# +# Schema reference: osmo/missions/README.md + +name: example_takeoff_land + +# Exported before every `airstack up`. Anything the compose stack reads from +# the environment can go here (.env values, ISAAC_SIM_SCRIPT_NAME, etc.). +env: + NUM_ROBOTS: 1 + COMPOSE_PROFILES: desktop,isaac-sim + ISAAC_SIM_HEADLESS: "true" + ISAAC_SIM_USE_STANDALONE: "true" + # The default script only spawns one drone; for NUM_ROBOTS > 1 switch to + # example_multi_px4_pegasus_launch_script.py. + ISAAC_SIM_SCRIPT_NAME: example_one_px4_pegasus_launch_script.py + # Missions are unattended — the sim must start playing without a GUI click. + PLAY_SIM_ON_START: "true" + +# Full up → fly → collect → down cycles. +iterations: 3 + +# Gate before any steps run: per robot, MAVROS connected, then +# local_position/odom publishing (PX4 EKF converged = ready to arm). +ready: + timeout_s: 600 + +# One mcap per robot per iteration; open the .mcap directly in Foxglove. +# {robot} expands to robot_1, robot_2, ... per robot. +record: + enabled: true + topics: + - /tf + - /tf_static + - /{robot}/odometry + - /{robot}/interface/mavros/local_position/odom + - /{robot}/odom_ground_truth + # Or record everything (large — includes camera/LiDAR if running): + # all: true + +# continue | abort_iteration (default) | abort_mission +on_step_failure: abort_iteration + +steps: + - action: + task: takeoff # → /robot_N/tasks/takeoff (TakeoffTask) + goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} + timeout_s: 120 + robots: all # or e.g. [1, 3] + + - wait: 30 # hover + + - run: # arbitrary command escape hatch + container: robot_1 # robot_N = exec on robot N's DDS domain + cmd: ros2 topic echo --once /robot_1/odometry + timeout_s: 20 + + - action: + task: land # → /robot_N/tasks/land (LandTask) + goal: {velocity_m_s: 0.5} + timeout_s: 120 diff --git a/osmo/workflows/airstack-mission.yaml b/osmo/workflows/airstack-mission.yaml new file mode 100644 index 000000000..135972352 --- /dev/null +++ b/osmo/workflows/airstack-mission.yaml @@ -0,0 +1,92 @@ +# AirStack batch mission workflow on OSMO. +# +# Same workspace image and pod layout as airstack-dev.yaml, but instead of a +# single `airstack up` for an interactive session, the entrypoint hands off +# to osmo/workspace/mission_runner.py, which executes the mission spec named +# by OSMO_MISSION_FILE: repeated cycles of +# +# airstack down → airstack up → wait for PX4 ready → record mcap bags +# → run steps (takeoff / land / navigate / any ros2 command) → collect +# bags + logs → airstack down +# +# Mission specs live in osmo/missions/ (cloned with the branch, so what you +# push is what runs). Schema: osmo/missions/README.md. +# +# To submit (or use the wrapper: airstack osmo:mission ): +# +# osmo workflow submit osmo/workflows/airstack-mission.yaml \ +# --pool \ +# --set-env "SSH_PUB_KEY=$(cat ~/.ssh/id_ed25519.pub)" \ +# "OSMO_MISSION_FILE=osmo/missions/example_takeoff_land.yaml" \ +# "AIRSTACK_BRANCH=" +# +# Results land in /osmo/output/airstack-mission-results (symlinked from +# /root/AirStack/osmo/results). With the default OSMO_MISSION_KEEP_ALIVE=true +# the pod stays alive after the mission so you can pull artifacts from your +# laptop: +# +# airstack osmo:fetch ./results/ +# +# Set OSMO_MISSION_KEEP_ALIVE=false for fire-and-forget runs: the task exits +# cleanly when the mission ends, which frees the GPU and triggers the +# `outputs:` upload below (if a destination bucket is configured). + +workflow: + name: airstack-mission + groups: + - name: airstack + tasks: + - name: workspace + lead: true + image: airlab-docker.andrew.cmu.edu/airstack/airstack-osmo-workspace:latest + # Required so the inner dockerd can run (same DinD setup as airstack-dev). + privileged: true + command: ["bash"] + args: ["/usr/local/bin/entrypoint.sh"] + environment: + # Mission selection — repo-relative path into the clone. Override at + # submit time to run a different mission. + OSMO_MISSION_FILE: "osmo/missions/example_takeoff_land.yaml" + # true → pod sleeps after the mission; fetch results over ssh, then + # `airstack osmo:down` (results die with the pod!). + # false → task exits when the mission ends: GPU freed, /osmo/output + # uploaded to `outputs:` destinations. + OSMO_MISSION_KEEP_ALIVE: "true" + AIRSTACK_BRANCH: "main" # branch entrypoint.sh clones + AIRSTACK_REPO_URL: "https://github.com/castacks/AirStack.git" + # SSH_PUB_KEY is supplied at submit time (needed for osmo:fetch): + # --set-env "SSH_PUB_KEY=$(cat ~/.ssh/id_ed25519.pub)" + # Uncomment and point at a bucket the OSMO deployment can write to, to + # have /osmo/output uploaded automatically when the task exits (only + # meaningful with OSMO_MISSION_KEEP_ALIVE=false — a canceled workflow + # does not upload outputs): + # + # outputs: + # - url: s3:///airstack-missions/{{workflow_id}}/ + credentials: + # Same per-user credentials as airstack-dev.yaml — see + # docs/tutorials/airstack_on_osmo.md "Step 0" / `airstack osmo:setup`. + airlab-nucleus: + OMNI_USER: omni_user + OMNI_PASS: omni_pass + OMNI_SERVER: omni_server + airlab-docker-login: + AIRLAB_REGISTRY_USER: username + AIRLAB_REGISTRY_PASS: password + + resources: + default: + cpu: 16 + gpu: 1 + memory: 64Gi + # Same sizing rationale as airstack-dev.yaml (inner image set alone + # exceeds 100Gi extracted), plus headroom for per-iteration mcap + # recordings — bump if a mission records camera/LiDAR over many + # iterations. + storage: 500Gi + + timeout: + # Missions run unattended and multi-iteration; 24h leaves room for long + # batches plus result download time in keep-alive mode. The pod is gone + # when this expires — fetch results first. + exec_timeout: 24h diff --git a/osmo/workspace/Dockerfile b/osmo/workspace/Dockerfile index e80f3be59..0e10d9e49 100644 --- a/osmo/workspace/Dockerfile +++ b/osmo/workspace/Dockerfile @@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ procps \ python3 \ python3-pip \ + python3-yaml \ rsync \ sudo \ tmux \ diff --git a/osmo/workspace/entrypoint.sh b/osmo/workspace/entrypoint.sh index bef9d9ba8..20d89910b 100755 --- a/osmo/workspace/entrypoint.sh +++ b/osmo/workspace/entrypoint.sh @@ -241,36 +241,70 @@ else log "WARN: --payload username= password=" fi -# ─── 6. airstack up ──────────────────────────────────────────────────────── - -# Honor optional overrides passed in via OSMO env. Defaults match a "single -# robot, Isaac Sim with WebRTC livestream" dev session. -export AUTOLAUNCH="${AUTOLAUNCH:-true}" -export NUM_ROBOTS="${NUM_ROBOTS:-1}" -export ISAAC_SIM_LIVESTREAM="${ISAAC_SIM_LIVESTREAM:-true}" - -# COMPOSE_PROFILES selection: the default `desktop,isaac-sim` from .env runs -# the standard isaac-sim service. If the student wants livestream, they (or -# we) swap to the isaac-sim-livestream profile, which is the OSMO-friendly -# variant defined in simulation/isaac-sim/docker/docker-compose.yaml. -if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then - export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim-livestream}" -else - export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim}" -fi +# ─── 6. airstack up / mission mode ───────────────────────────────────────── cd "$AIRSTACK_ROOT" -if [ "${OSMO_AIRSTACK_UP:-true}" = "true" ]; then - log "airstack up (COMPOSE_PROFILES=$COMPOSE_PROFILES, NUM_ROBOTS=$NUM_ROBOTS, livestream=$ISAAC_SIM_LIVESTREAM)" - ./airstack.sh up || log "WARN: airstack up exited non-zero — pod stays alive for debugging via SSH" + +if [ -n "${OSMO_MISSION_FILE:-}" ]; then + # Mission mode (airstack-mission.yaml): mission_runner.py owns the full + # stack lifecycle — repeated `airstack down/up` per iteration, PX4 + # readiness gating, mcap recording, and artifact collection into + # /osmo/output. Stack env (NUM_ROBOTS, COMPOSE_PROFILES, + # ISAAC_SIM_SCRIPT_NAME, ...) comes from the mission spec's `env:` block, + # so none of the dev-session defaults below are exported here. + # + # The runner is taken from the clone, not baked into the image, so the + # mission spec and the code executing it always come from the same branch. + MISSION_PATH="$AIRSTACK_ROOT/$OSMO_MISSION_FILE" + [ -f "$MISSION_PATH" ] || MISSION_PATH="$OSMO_MISSION_FILE" # allow absolute paths + if [ ! -f "$MISSION_PATH" ]; then + log "ERROR: mission file not found: $OSMO_MISSION_FILE — pod stays alive for debugging via SSH" + else + log "mission mode: python3 osmo/workspace/mission_runner.py $MISSION_PATH" + if python3 "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py" "$MISSION_PATH" \ + --airstack-root "$AIRSTACK_ROOT"; then + log "mission runner finished: all iterations passed" + else + log "WARN: mission runner exited non-zero — see summary.json / steps.json in the results dir" + fi + fi + if [ "${OSMO_MISSION_KEEP_ALIVE:-true}" != "true" ]; then + log "OSMO_MISSION_KEEP_ALIVE=false — exiting so OSMO uploads /osmo/output and frees the GPU" + # Exit 0 regardless of mission status: a clean task exit is what + # triggers the `outputs:` upload; mission pass/fail lives in summary.json. + exit 0 + fi + log "OSMO_MISSION_KEEP_ALIVE=true — pod stays alive; download results with 'airstack osmo:fetch'" else - log "OSMO_AIRSTACK_UP=false — skipping airstack up; SSH in and run ./airstack.sh up manually" + # Dev mode (airstack-dev.yaml): single bring-up, then the student drives. + # Honor optional overrides passed in via OSMO env. Defaults match a + # "single robot, Isaac Sim with WebRTC livestream" dev session. + export AUTOLAUNCH="${AUTOLAUNCH:-true}" + export NUM_ROBOTS="${NUM_ROBOTS:-1}" + export ISAAC_SIM_LIVESTREAM="${ISAAC_SIM_LIVESTREAM:-true}" + + # COMPOSE_PROFILES selection: the default `desktop,isaac-sim` from .env runs + # the standard isaac-sim service. If the student wants livestream, they (or + # we) swap to the isaac-sim-livestream profile, which is the OSMO-friendly + # variant defined in simulation/isaac-sim/docker/docker-compose.yaml. + if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then + export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim-livestream}" + else + export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim}" + fi + + if [ "${OSMO_AIRSTACK_UP:-true}" = "true" ]; then + log "airstack up (COMPOSE_PROFILES=$COMPOSE_PROFILES, NUM_ROBOTS=$NUM_ROBOTS, livestream=$ISAAC_SIM_LIVESTREAM)" + ./airstack.sh up || log "WARN: airstack up exited non-zero — pod stays alive for debugging via SSH" + else + log "OSMO_AIRSTACK_UP=false — skipping airstack up; SSH in and run ./airstack.sh up manually" + fi fi # ─── 7. Sleep ────────────────────────────────────────────────────────────── log "entrypoint complete; sleeping forever so port-forwards keep working" -if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then +if [ "${ISAAC_SIM_LIVESTREAM:-false}" = "true" ]; then isaac_sim_log_container="isaac-sim-livestream" else isaac_sim_log_container="airstack-isaac-sim-1" diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py new file mode 100644 index 000000000..8d6f873b9 --- /dev/null +++ b/osmo/workspace/mission_runner.py @@ -0,0 +1,684 @@ +#!/usr/bin/env python3 +"""mission_runner.py — batch mission executor for AirStack-on-OSMO pods. + +Reads a declarative mission spec (YAML, see osmo/missions/README.md) and runs +N full iterations of: + + airstack down → airstack up → wait for PX4 ready → start mcap recording + → execute steps (ros2 action goals / topic pubs / service calls / raw + commands / waits) → stop recording → collect bags + container logs → + airstack down + +Artifacts land under one results root per mission run: + + /// + ├── summary.json # per-iteration pass/fail + durations + └── iter_001/ + ├── bags/robot_1/*.mcap # Foxglove-ready (open the .mcap directly) + ├── logs/.log # docker logs snapshot, per container + ├── ready.json # per-robot PX4 readiness timings + └── steps.json # per-step command, output tail, status + +The results root prefers /osmo/output (OSMO uploads that directory to the +workflow's `outputs:` destinations when the task exits) and falls back to +/osmo/results. Either way a symlink is left at +/osmo/results so `airstack osmo:fetch` always finds it. + +Designed to run on the OSMO workspace pod (invoked by entrypoint.sh when +OSMO_MISSION_FILE is set), but has no OSMO dependency: it only needs docker, +python3 + PyYAML, and a checkout with airstack.sh — so it can be tested on +any dev machine that can run `airstack up`. + +Command patterns (ros2 exec into the robot container with a per-robot +ROS_DOMAIN_ID, two-gate PX4 readiness, action-result parsing) mirror +tests/conftest.py and tests/system/test_takeoff_hover_land.py. +""" + +import argparse +import json +import os +import shlex +import subprocess +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path + +import yaml + +ROS_DISTRO_SETUP = "/opt/ros/jazzy/setup.bash" +DEFAULT_ROBOT_SETUP_BASH = "/root/AirStack/robot/ros_ws/install/setup.bash" +ROBOT_CONTAINER_PATTERN = "robot.*desktop" + +# Topics recorded when the mission spec doesn't list its own. {robot} +# expands to robot_ per robot. /tf + /tf_static are what let a Foxglove +# 3D panel pose anything at all during replay. +DEFAULT_RECORD_TOPICS = [ + "/tf", + "/tf_static", + "/{robot}/odometry", + "/{robot}/interface/mavros/local_position/odom", + "/{robot}/odom_ground_truth", + "/{robot}/global_plan", +] + +# In-container staging dir for bag recordings (docker cp'd out before the +# stack goes down). Lives in robot container 1 regardless of robot count — +# all replicas share the bridge network, so any container reaches any +# robot's DDS domain by exporting that robot's ROS_DOMAIN_ID. +BAG_STAGING_DIR = "/tmp/osmo_bags" + +# Tasks the GCS action_relay bridges (gcs/ros_ws/src/action_relay). Goals for +# these can be routed `via: gcs` — published as String JSON on +# //tasks//goal (GCS domain 0), exactly the path Foxglove uses. +GCS_RELAY_TASKS = {"takeoff", "land", "navigate", "fixed_trajectory", + "semantic_search", "exploration"} + +MISSION_DEFAULTS = { + "iterations": 1, + "on_step_failure": "abort_iteration", # continue | abort_iteration | abort_mission + "ready": {"timeout_s": 600, "poll_interval_s": 5}, + "record": {"enabled": True}, + # How the stack is brought up. Either (or both): + # services: [isaac-sim, robot-desktop, gcs] → ./airstack.sh up + # (compose auto-enables a named service's profile) + # profiles: [desktop, isaac-sim] → exported as COMPOSE_PROFILES + # With neither, plain `airstack up` with COMPOSE_PROFILES=desktop,isaac-sim. + "stack": {"services": [], "profiles": []}, + # Default route for `action` steps: "gcs" sends goals through the GCS + # action_relay (same path as Foxglove / the GCS panels — exercises the + # full GCS→robot chain incl. the relay's airborne preconditions); + # "robot" sends ros2 action goals directly on the robot's DDS domain. + # Override per step with `via:`. + "command_route": "gcs", + "up_timeout_s": 3600, # first `up` on a fresh pod pulls the full image set + "down_timeout_s": 300, + "robot_setup_bash": DEFAULT_ROBOT_SETUP_BASH, +} + + +def log(msg): + print(f"[mission {datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) + + +def utc_stamp(): + return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S") + + +# ── subprocess / docker plumbing ─────────────────────────────────────────── + +def sh(cmd_list, timeout, env=None, cwd=None): + """Run a command, capturing output. Never raises on non-zero exit; a + TimeoutExpired is converted into a synthetic failed result so step + failures stay data, not exceptions.""" + try: + return subprocess.run(cmd_list, capture_output=True, text=True, + timeout=timeout, env=env, cwd=cwd) + except subprocess.TimeoutExpired as e: + return subprocess.CompletedProcess( + cmd_list, returncode=124, + stdout=(e.stdout or b"").decode() if isinstance(e.stdout, bytes) else (e.stdout or ""), + stderr=f"timed out after {timeout}s", + ) + + +def docker_exec(container, cmd, timeout): + return sh(["docker", "exec", container, "bash", "-c", cmd], timeout=timeout) + + +def ros2_env_prefix(setup_bash, domain_id): + # Workspace setup is conditional: the GCS container has a different (or + # no) workspace at the robot path, and plain std_msgs pub/echo only needs + # the distro setup anyway. + return (f"source {ROS_DISTRO_SETUP} && " + f"{{ [ -f {setup_bash} ] && source {setup_bash}; }}; " + f"export ROS_DOMAIN_ID={domain_id}") + + +def ros2_exec(container, ros2_cmd, domain_id, setup_bash, timeout): + return docker_exec(container, f"{ros2_env_prefix(setup_bash, domain_id)} && {ros2_cmd}", + timeout=timeout) + + +def list_containers(name_pattern=None, all_states=False): + cmd = ["docker", "ps", "--format", "{{.Names}}"] + if all_states: + cmd.insert(2, "-a") + if name_pattern: + cmd += ["--filter", f"name={name_pattern}"] + result = sh(cmd, timeout=15) + return [n for n in result.stdout.strip().splitlines() if n] + + +def robot_containers(): + """Running robot containers sorted by replica index (replica n ↔ robot_n).""" + def index(name): + tail = name.rsplit("-", 1)[-1] + return int(tail) if tail.isdigit() else 0 + return sorted(list_containers(ROBOT_CONTAINER_PATTERN), key=index) + + +def gcs_container(): + names = list_containers("gcs") + return names[0] if names else None + + +# ── mission spec ─────────────────────────────────────────────────────────── + +def load_mission(path): + with open(path, encoding="utf-8") as f: + spec = yaml.safe_load(f) + if not isinstance(spec, dict): + raise ValueError(f"{path}: mission spec must be a YAML mapping") + if not spec.get("steps"): + raise ValueError(f"{path}: mission spec has no 'steps'") + merged = dict(MISSION_DEFAULTS) + for key, default in MISSION_DEFAULTS.items(): + if isinstance(default, dict): + merged[key] = {**default, **(spec.get(key) or {})} + merged.update({k: v for k, v in spec.items() if not isinstance(MISSION_DEFAULTS.get(k), dict)}) + merged.setdefault("name", Path(path).stem) + merged.setdefault("env", {}) + if merged["on_step_failure"] not in ("continue", "abort_iteration", "abort_mission"): + raise ValueError(f"on_step_failure must be continue|abort_iteration|abort_mission, " + f"got {merged['on_step_failure']!r}") + if merged["command_route"] not in ("gcs", "robot"): + raise ValueError(f"command_route must be gcs|robot, got {merged['command_route']!r}") + for step in merged["steps"]: + action = step.get("action") if isinstance(step, dict) else None + if action: + via = action.get("via", merged["command_route"]) + if via not in ("gcs", "robot"): + raise ValueError(f"action via must be gcs|robot, got {via!r}") + if via == "gcs" and action.get("task") not in GCS_RELAY_TASKS: + raise ValueError( + f"task '{action.get('task')}' is not bridged by the GCS action_relay " + f"({', '.join(sorted(GCS_RELAY_TASKS))}); add `via: robot` to send it " + f"directly on the robot's domain") + return merged + + +def task_action_type(task): + """tasks/semantic_search → task_msgs/action/SemanticSearchTask""" + camel = "".join(part.capitalize() for part in task.split("_")) + return f"task_msgs/action/{camel}Task" + + +def expand(text, n): + """Substitute per-robot placeholders: {n} → robot index, {robot} → robot_.""" + return text.replace("{robot}", f"robot_{n}").replace("{n}", str(n)) + + +def step_robots(step_spec, num_robots): + robots = step_spec.get("robots", "all") + if robots == "all": + return list(range(1, num_robots + 1)) + return [int(r) for r in robots] + + +# ── stack lifecycle ──────────────────────────────────────────────────────── + +class Stack: + def __init__(self, airstack_root, mission): + self.root = airstack_root + self.mission = mission + self.env = os.environ.copy() + self.env.update({k: str(v) for k, v in mission["env"].items()}) + self.env.setdefault("AUTOLAUNCH", "true") + self.env.setdefault("NUM_ROBOTS", "1") + # Bring-up selection (see MISSION_DEFAULTS["stack"]): an explicit + # service list is passed to `airstack up `; explicit + # profiles become COMPOSE_PROFILES. With neither, default profiles. + self.services = [str(s) for s in mission["stack"].get("services") or []] + profiles = [str(p) for p in mission["stack"].get("profiles") or []] + if profiles: + self.env["COMPOSE_PROFILES"] = ",".join(profiles) + elif not self.services: + self.env.setdefault("COMPOSE_PROFILES", "desktop,isaac-sim") + self.num_robots = int(self.env["NUM_ROBOTS"]) + self.setup_bash = mission["robot_setup_bash"] + + def _airstack(self, verb, timeout, extra_args=()): + log(f"airstack {verb} {' '.join(extra_args)} " + f"(NUM_ROBOTS={self.env['NUM_ROBOTS']}, " + f"COMPOSE_PROFILES={self.env.get('COMPOSE_PROFILES', '')})") + return sh([str(Path(self.root) / "airstack.sh"), verb, *extra_args], + timeout=timeout, env=self.env, cwd=self.root) + + def up(self): + result = self._airstack("up", self.mission["up_timeout_s"], + extra_args=self.services) + if result.returncode != 0: + raise RuntimeError(f"airstack up failed (exit {result.returncode}):\n" + + "\n".join(result.stdout.splitlines()[-20:]) + + "\n" + "\n".join(result.stderr.splitlines()[-20:])) + deadline = time.time() + 120 + while time.time() < deadline: + containers = robot_containers() + if len(containers) >= self.num_robots: + return containers + time.sleep(3) + raise RuntimeError(f"expected {self.num_robots} robot containers, " + f"found {robot_containers()} after airstack up") + + def down(self): + result = self._airstack("down", self.mission["down_timeout_s"]) + if result.returncode != 0: + log(f"WARN: airstack down exited {result.returncode}") + + def wait_ready(self, container): + """Two sequential gates per robot (same as the system tests): + 1. mavros/state reports connected=True (MAVROS ↔ PX4 heartbeat); + 2. local_position/odom publishes (PX4 EKF converged — the actual + precondition for arming; `connected` alone fires ~25s too early). + Returns {robot_n: seconds_to_ready}; raises on timeout.""" + cfg = self.mission["ready"] + started = time.time() + deadline = started + cfg["timeout_s"] + connected, ready_at = set(), {} + pending = list(range(1, self.num_robots + 1)) + + while pending and time.time() < deadline: + for n in list(pending): + if n not in connected: + r = ros2_exec(container, + f"timeout 5 ros2 topic echo --once --csv " + f"--field connected /robot_{n}/interface/mavros/state", + domain_id=n, setup_bash=self.setup_bash, timeout=15) + if any(line.strip() == "True" for line in r.stdout.splitlines()): + connected.add(n) + else: + continue + r = ros2_exec(container, + f"timeout 5 ros2 topic echo --once " + f"/robot_{n}/interface/mavros/local_position/odom", + domain_id=n, setup_bash=self.setup_bash, timeout=15) + if r.returncode == 0 and "pose:" in r.stdout: + ready_at[n] = round(time.time() - started, 2) + pending.remove(n) + if pending: + log(f"waiting for PX4: connected={sorted(connected)} pending={pending} " + f"elapsed={time.time() - started:.0f}s") + time.sleep(cfg["poll_interval_s"]) + + if pending: + raise RuntimeError(f"robots {pending} not ready within {cfg['timeout_s']}s " + f"(connected so far: {sorted(connected)})") + log(f"PX4 ready: {ready_at}") + return ready_at + + +# ── bag recording ────────────────────────────────────────────────────────── + +class Recorder: + """One `ros2 bag record -s mcap` per robot, all inside robot container 1. + + Each recorder is started detached with its PID dropped to a file, and + stopped with SIGTERM so rosbag2 finalizes the mcap cleanly. (Not SIGINT: + jobs backgrounded from a non-interactive shell have SIGINT set to + SIG_IGN, so it would never be delivered; rosbag2 handles SIGTERM the + same way.) Bags are docker cp'd to the host before the stack goes down.""" + + def __init__(self, container, mission, num_robots, setup_bash): + self.container = container + self.cfg = mission["record"] + self.num_robots = num_robots + self.setup_bash = setup_bash + self.active = [] + + def start(self): + if not self.cfg.get("enabled", True): + log("recording disabled by mission spec") + return + docker_exec(self.container, + f"rm -rf {BAG_STAGING_DIR} && mkdir -p {BAG_STAGING_DIR}", timeout=15) + for n in range(1, self.num_robots + 1): + if self.cfg.get("all"): + selection = "-a" + else: + topics = [expand(t, n) for t in self.cfg.get("topics", DEFAULT_RECORD_TOPICS)] + selection = " ".join(shlex.quote(t) for t in topics) + out_dir = f"{BAG_STAGING_DIR}/robot_{n}" + inner = ( + f"{ros2_env_prefix(self.setup_bash, n)} && " + # nohup + pidfile: the record process must outlive this + # docker exec; --include-hidden-topics is not needed, and + # unknown listed topics are fine (record waits for them). + f"nohup ros2 bag record -s mcap -o {out_dir} {selection} " + f"> {BAG_STAGING_DIR}/record_{n}.log 2>&1 & " + f"echo $! > {BAG_STAGING_DIR}/record_{n}.pid" + ) + r = docker_exec(self.container, inner, timeout=30) + if r.returncode == 0: + self.active.append(n) + log(f"recording robot_{n} → {out_dir} " + f"({'all topics' if self.cfg.get('all') else f'{len(selection.split())} topics'})") + else: + log(f"WARN: failed to start recorder for robot_{n}: {r.stderr.strip()[:200]}") + + def stop(self): + for n in self.active: + docker_exec(self.container, f""" + pid=$(cat {BAG_STAGING_DIR}/record_{n}.pid 2>/dev/null) || exit 0 + kill -TERM "$pid" 2>/dev/null || exit 0 + for i in $(seq 1 20); do + kill -0 "$pid" 2>/dev/null || exit 0 + sleep 1 + done + kill -9 "$pid" 2>/dev/null || true + """, timeout=40) + if self.active: + log(f"recorders stopped for robots {self.active}") + self.active = [] + + def collect(self, dest_dir): + if not self.cfg.get("enabled", True): + return + dest_dir.mkdir(parents=True, exist_ok=True) + r = sh(["docker", "cp", f"{self.container}:{BAG_STAGING_DIR}/.", str(dest_dir)], + timeout=1800) + if r.returncode != 0: + log(f"WARN: docker cp of bags failed: {r.stderr.strip()[:200]}") + else: + mcaps = list(dest_dir.rglob("*.mcap")) + log(f"collected {len(mcaps)} mcap file(s) → {dest_dir}") + + +# ── step execution ───────────────────────────────────────────────────────── + +def action_ok(stdout): + """ros2 action send_goal --feedback prints the result as YAML; + AirStack task results carry `success: true` on completion.""" + return "success: true" in stdout + + +def tail(text, lines=15): + return "\n".join((text or "").strip().splitlines()[-lines:]) + + +def run_step(stack, container, step_spec, step_index): + """Execute one step; returns a result dict with ok: bool.""" + record = {"index": step_index, "spec": step_spec, + "started_at": datetime.now(timezone.utc).isoformat()} + t0 = time.time() + + if "wait" in step_spec: + seconds = float(step_spec["wait"]) + log(f"step {step_index}: wait {seconds}s") + time.sleep(seconds) + record.update(type="wait", ok=True) + + elif "action" in step_spec: + spec = step_spec["action"] + task = spec["task"] + via = spec.get("via", stack.mission["command_route"]) + goal = spec.get("goal", {}) + # Normalize the goal to JSON: dicts directly; strings may be ros2-style + # YAML ("{target_altitude_m: 10}") — YAML-parse then re-dump. JSON is + # what the GCS relay requires, and it's a YAML subset so the direct + # send_goal path accepts it too. + goal_obj = yaml.safe_load(goal) if isinstance(goal, str) else goal + goal_json = json.dumps(goal_obj or {}) + timeout = float(spec.get("timeout_s", 120)) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: action {task} {goal_json} via {via} → robots {robots}") + + if via == "gcs": + gcs = gcs_container() + if not gcs: + record.update(type="action", task=task, via=via, ok=False, + error="no gcs container running — `via: gcs` needs the " + "gcs service up (or use `via: robot`)") + record["duration_s"] = round(time.time() - t0, 2) + log(f"step {step_index}: FAILED (no gcs container)") + return record + + def send(n): + # Same path as Foxglove: publish String JSON on + # //tasks//goal (GCS domain 0); the per-robot + # action_relay forwards it as a typed action goal on domain N + # and reports {"success": ..., "message": ...} on + # .../relay_result. Subscribe to the result *before* + # publishing the goal so a fast result can't be missed. + base = f"/robot_{n}/tasks/{task}" + result_file = f"/tmp/relay_result_{task}_{n}.out" + msg_yaml = json.dumps({"data": expand(goal_json, n)}) + script = ( + f"rm -f {result_file}\n" + f"( timeout {int(timeout)} ros2 topic echo --once --field data " + f"{base}/relay_result > {result_file} 2>&1 ) &\n" + f"sub=$!\n" + f"sleep 3\n" + f"ros2 topic pub --once {base}/goal std_msgs/msg/String " + f"{shlex.quote(msg_yaml)} > /dev/null\n" + f"wait $sub\n" + f"cat {result_file}" + ) + r = ros2_exec(gcs, script, domain_id=0, setup_bash=GCS_SETUP_BASH, + timeout=int(timeout + 30)) + ok = '"success": true' in r.stdout + return n, {"exit": r.returncode, "ok": ok, + "output_tail": tail(r.stdout + r.stderr)} + else: + action_type = spec.get("type", task_action_type(task)) + + def send(n): + cmd = (f"ros2 action send_goal --feedback /robot_{n}/tasks/{task} " + f"{action_type} {shlex.quote(expand(goal_json, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=int(timeout + 15)) + return n, {"exit": r.returncode, "ok": action_ok(r.stdout), + "output_tail": tail(r.stdout + r.stderr)} + + with ThreadPoolExecutor(max_workers=len(robots)) as pool: + results = dict(pool.map(send, robots)) + record.update(type="action", task=task, via=via, per_robot=results, + ok=all(v["ok"] for v in results.values())) + + elif "run" in step_spec: + spec = step_spec["run"] + cmd = spec["cmd"] + timeout = float(spec.get("timeout_s", 60)) + target = spec.get("container", "robot_1") + log(f"step {step_index}: run [{target}] {cmd}") + if target == "pod": + r = sh(["bash", "-c", cmd], timeout=timeout, cwd=stack.root) + else: + # robot_N targets exec into robot container 1 on robot N's DDS + # domain (the containers share one bridge network); any other + # value is taken as a literal container name on domain 0. + if target.startswith("robot_") and target[6:].isdigit(): + n = int(target[6:]) + r = ros2_exec(container, expand(cmd, n), domain_id=n, + setup_bash=stack.setup_bash, timeout=timeout + 15) + else: + r = docker_exec(target, cmd, timeout=timeout + 15) + ok = r.returncode == 0 if spec.get("expect_success", True) else True + record.update(type="run", exit=r.returncode, ok=ok, + output_tail=tail(r.stdout + r.stderr)) + + elif "topic_pub" in step_spec: + spec = step_spec["topic_pub"] + msg = spec.get("msg", {}) + msg_str = msg if isinstance(msg, str) else json.dumps(msg) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: topic_pub {spec['topic']} → robots {robots}") + results = {} + for n in robots: + cmd = (f"ros2 topic pub --once {expand(spec['topic'], n)} " + f"{spec['type']} {shlex.quote(expand(msg_str, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=float(spec.get("timeout_s", 30)) + 15) + results[n] = {"exit": r.returncode, "ok": r.returncode == 0, + "output_tail": tail(r.stdout + r.stderr)} + record.update(type="topic_pub", per_robot=results, + ok=all(v["ok"] for v in results.values())) + + elif "service_call" in step_spec: + spec = step_spec["service_call"] + req = spec.get("request", {}) + req_str = req if isinstance(req, str) else json.dumps(req) + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: service_call {spec['service']} → robots {robots}") + results = {} + for n in robots: + cmd = (f"ros2 service call {expand(spec['service'], n)} " + f"{spec['type']} {shlex.quote(expand(req_str, n))}") + r = ros2_exec(container, cmd, domain_id=n, setup_bash=stack.setup_bash, + timeout=float(spec.get("timeout_s", 30)) + 15) + results[n] = {"exit": r.returncode, "ok": r.returncode == 0, + "output_tail": tail(r.stdout + r.stderr)} + record.update(type="service_call", per_robot=results, + ok=all(v["ok"] for v in results.values())) + + else: + record.update(type="unknown", ok=False, + error=f"unrecognized step keys: {sorted(step_spec)}") + + record["duration_s"] = round(time.time() - t0, 2) + log(f"step {step_index}: {'OK' if record['ok'] else 'FAILED'} " + f"({record['duration_s']}s)") + return record + + +# ── artifacts ────────────────────────────────────────────────────────────── + +def snapshot_container_logs(dest_dir): + dest_dir.mkdir(parents=True, exist_ok=True) + for name in list_containers(name_pattern="airstack", all_states=True): + r = sh(["docker", "logs", name], timeout=120) + (dest_dir / f"{name}.log").write_text( + (r.stdout or "") + (("\n--- stderr ---\n" + r.stderr) if r.stderr else ""), + encoding="utf-8") + + +def write_json(path, data): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2, default=str) + "\n", encoding="utf-8") + + +def resolve_results_root(airstack_root): + """Prefer /osmo/output (auto-uploaded by OSMO `outputs:` when the task + exits); keep /osmo/results working in both cases so + `airstack osmo:fetch` has one stable path to rsync.""" + fallback = Path(airstack_root) / "osmo" / "results" + override = os.environ.get("OSMO_RESULTS_ROOT") + if override: + root = Path(override) + root.mkdir(parents=True, exist_ok=True) + return root + osmo_output = Path(os.environ.get("OSMO_OUTPUT_DIR", "/osmo/output")) + if osmo_output.is_dir() and os.access(osmo_output, os.W_OK): + root = osmo_output / "airstack-mission-results" + root.mkdir(parents=True, exist_ok=True) + if not fallback.exists(): + fallback.parent.mkdir(parents=True, exist_ok=True) + fallback.symlink_to(root) + return root + fallback.mkdir(parents=True, exist_ok=True) + return fallback + + +# ── main loop ────────────────────────────────────────────────────────────── + +def run_iteration(stack, mission, iter_dir): + """One full up → ready → record → steps → collect → down cycle. + Returns the iteration summary dict; never raises (failures are data).""" + summary = {"status": "passed", "steps_ok": 0, "steps_failed": 0} + recorder = None + container = None + t0 = time.time() + try: + stack.down() + containers = stack.up() + container = containers[0] + summary["up_duration_s"] = round(time.time() - t0, 2) + + ready_at = stack.wait_ready(container) + write_json(iter_dir / "ready.json", ready_at) + + recorder = Recorder(container, mission, stack.num_robots, stack.setup_bash) + recorder.start() + + steps = [] + for i, step_spec in enumerate(mission["steps"], start=1): + record = run_step(stack, container, step_spec, i) + steps.append(record) + if record["ok"]: + summary["steps_ok"] += 1 + continue + summary["steps_failed"] += 1 + summary["status"] = "failed" + policy = mission["on_step_failure"] + if policy == "continue": + continue + if policy == "abort_mission": + summary["abort_mission"] = True + break + write_json(iter_dir / "steps.json", steps) + + except Exception as e: + summary["status"] = "error" + summary["error"] = str(e) + log(f"ERROR: iteration aborted: {e}") + + finally: + # Artifact collection happens even on failure — a failed flight's + # bag is usually the most interesting one. + if recorder is not None: + recorder.stop() + recorder.collect(iter_dir / "bags") + if container is not None or list_containers("airstack", all_states=True): + snapshot_container_logs(iter_dir / "logs") + stack.down() + summary["duration_s"] = round(time.time() - t0, 2) + write_json(iter_dir / "iteration.json", summary) + return summary + + +def main(): + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument("mission_file", help="Path to mission spec YAML") + parser.add_argument("--airstack-root", + default=os.environ.get("AIRSTACK_ROOT", "/root/AirStack")) + parser.add_argument("--dry-run", action="store_true", + help="Validate the spec and print the plan without touching docker") + args = parser.parse_args() + + mission = load_mission(args.mission_file) + stack = Stack(args.airstack_root, mission) + + log(f"mission '{mission['name']}': {mission['iterations']} iteration(s), " + f"{len(mission['steps'])} step(s), {stack.num_robots} robot(s)") + if args.dry_run: + print(yaml.safe_dump(mission, sort_keys=False)) + return 0 + + results_root = resolve_results_root(args.airstack_root) + run_dir = results_root / mission["name"] / utc_stamp() + run_dir.mkdir(parents=True) + log(f"results → {run_dir}") + + iterations = [] + for i in range(1, mission["iterations"] + 1): + log(f"━━━ iteration {i}/{mission['iterations']} ━━━") + iter_dir = run_dir / f"iter_{i:03d}" + summary = run_iteration(stack, mission, iter_dir) + summary["iteration"] = i + iterations.append(summary) + write_json(run_dir / "summary.json", + {"mission": mission["name"], "mission_file": args.mission_file, + "iterations": iterations}) + if summary.get("abort_mission"): + log("on_step_failure=abort_mission — stopping remaining iterations") + break + + passed = sum(1 for s in iterations if s["status"] == "passed") + log(f"mission complete: {passed}/{len(iterations)} iteration(s) passed; " + f"results in {run_dir}") + return 0 if passed == len(iterations) else 1 + + +if __name__ == "__main__": + sys.exit(main()) From 6ad33f0012250a54d83199e5c3f9375a85e53158 Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Wed, 10 Jun 2026 15:54:20 -0400 Subject: [PATCH 2/7] updated example mission --- osmo/missions/example_takeoff_land.yaml | 23 +++++++++++++++++++++-- osmo/workspace/mission_runner.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml index 537e95b37..1365d44ac 100644 --- a/osmo/missions/example_takeoff_land.yaml +++ b/osmo/missions/example_takeoff_land.yaml @@ -11,11 +11,20 @@ name: example_takeoff_land +# How to bring the stack up. `services` are passed straight through: +# ./airstack.sh up isaac-sim robot-desktop gcs +# (docker compose auto-enables a named service's profile, so no +# COMPOSE_PROFILES needed). On OSMO swap isaac-sim → isaac-sim-livestream to +# watch the run over WebRTC. Alternatively use `profiles:` to export +# COMPOSE_PROFILES and run a plain `airstack up`. +stack: + services: [isaac-sim, robot-desktop, gcs] + # profiles: [desktop, isaac-sim] + # Exported before every `airstack up`. Anything the compose stack reads from # the environment can go here (.env values, ISAAC_SIM_SCRIPT_NAME, etc.). env: NUM_ROBOTS: 1 - COMPOSE_PROFILES: desktop,isaac-sim ISAAC_SIM_HEADLESS: "true" ISAAC_SIM_USE_STANDALONE: "true" # The default script only spawns one drone; for NUM_ROBOTS > 1 switch to @@ -24,6 +33,14 @@ env: # Missions are unattended — the sim must start playing without a GUI click. PLAY_SIM_ON_START: "true" +# Where `action` steps are sent by default: +# gcs — through the GCS action_relay, the same String-JSON → typed-goal +# path Foxglove and the GCS panels use (exercises the full +# GCS→robot chain, including the relay's airborne preconditions) +# robot — `ros2 action send_goal` directly on the robot's DDS domain +# Override per step with `via:`. +command_route: gcs + # Full up → fly → collect → down cycles. iterations: 3 @@ -58,7 +75,8 @@ steps: - wait: 30 # hover - run: # arbitrary command escape hatch - container: robot_1 # robot_N = exec on robot N's DDS domain + container: robot_1 # robot_N = exec on robot N's DDS domain; + # also: gcs | pod | cmd: ros2 topic echo --once /robot_1/odometry timeout_s: 20 @@ -66,3 +84,4 @@ steps: task: land # → /robot_N/tasks/land (LandTask) goal: {velocity_m_s: 0.5} timeout_s: 120 + # via: robot # bypass the GCS relay for this step diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py index 8d6f873b9..5e36dd544 100644 --- a/osmo/workspace/mission_runner.py +++ b/osmo/workspace/mission_runner.py @@ -49,6 +49,7 @@ ROS_DISTRO_SETUP = "/opt/ros/jazzy/setup.bash" DEFAULT_ROBOT_SETUP_BASH = "/root/AirStack/robot/ros_ws/install/setup.bash" +GCS_SETUP_BASH = "/root/AirStack/gcs/ros_ws/install/setup.bash" ROBOT_CONTAINER_PATTERN = "robot.*desktop" # Topics recorded when the mission spec doesn't list its own. {robot} @@ -484,6 +485,15 @@ def send(n): log(f"step {step_index}: run [{target}] {cmd}") if target == "pod": r = sh(["bash", "-c", cmd], timeout=timeout, cwd=stack.root) + elif target == "gcs": + g = gcs_container() + if g: + r = ros2_exec(g, cmd, domain_id=0, setup_bash=GCS_SETUP_BASH, + timeout=timeout + 15) + else: + r = subprocess.CompletedProcess( + cmd, returncode=1, stdout="", + stderr="no gcs container running") else: # robot_N targets exec into robot container 1 on robot N's DDS # domain (the containers share one bridge network); any other From 55be3f80f889dfb37ad1649434df01eeb6822482 Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Wed, 10 Jun 2026 16:05:46 -0400 Subject: [PATCH 3/7] changes to recording topics --- osmo/missions/example_takeoff_land.yaml | 21 ++- osmo/workflows/airstack-mission.yaml | 2 +- osmo/workspace/mission_runner.py | 197 +++++++++++++++++------- 3 files changed, 161 insertions(+), 59 deletions(-) diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml index 1365d44ac..5f3275204 100644 --- a/osmo/missions/example_takeoff_land.yaml +++ b/osmo/missions/example_takeoff_land.yaml @@ -49,17 +49,26 @@ iterations: 3 ready: timeout_s: 600 -# One mcap per robot per iteration; open the .mcap directly in Foxglove. -# {robot} expands to robot_1, robot_2, ... per robot. +# One mcap per iteration with ALL robots' data, recorded on the GCS +# (domain 0) where each robot's domain_bridge forwards its state topics — +# replaying the .mcap in Foxglove reproduces the live GCS view. +# {robot} expands to robot_1..robot_N and the list is unioned. record: enabled: true + scope: gcs # or `robot`: one mcap per robot on its own + # domain, for topics not bridged to the GCS topics: - /tf - /tf_static - - /{robot}/odometry - - /{robot}/interface/mavros/local_position/odom - - /{robot}/odom_ground_truth - # Or record everything (large — includes camera/LiDAR if running): + - /{robot}/odometry_conversion/odometry + - /{robot}/interface/mavros/global_position/global + - /{robot}/trajectory_controller/trajectory_vis + - /{robot}/global_plan + - /gcs/robot_markers + - /gcs/map_origin/location + - /gcs/map_origin/ground_msl + - /gcs/{robot}/location + # Or record everything visible on the recording domain (large): # all: true # continue | abort_iteration (default) | abort_mission diff --git a/osmo/workflows/airstack-mission.yaml b/osmo/workflows/airstack-mission.yaml index 135972352..094ed25aa 100644 --- a/osmo/workflows/airstack-mission.yaml +++ b/osmo/workflows/airstack-mission.yaml @@ -52,7 +52,7 @@ workflow: # false → task exits when the mission ends: GPU freed, /osmo/output # uploaded to `outputs:` destinations. OSMO_MISSION_KEEP_ALIVE: "true" - AIRSTACK_BRANCH: "main" # branch entrypoint.sh clones + AIRSTACK_BRANCH: "krrishj18/osmo-mission-runner" # branch entrypoint.sh clones AIRSTACK_REPO_URL: "https://github.com/castacks/AirStack.git" # SSH_PUB_KEY is supplied at submit time (needed for osmo:fetch): # --set-env "SSH_PUB_KEY=$(cat ~/.ssh/id_ed25519.pub)" diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py index 5e36dd544..9d054b070 100644 --- a/osmo/workspace/mission_runner.py +++ b/osmo/workspace/mission_runner.py @@ -52,22 +52,38 @@ GCS_SETUP_BASH = "/root/AirStack/gcs/ros_ws/install/setup.bash" ROBOT_CONTAINER_PATTERN = "robot.*desktop" -# Topics recorded when the mission spec doesn't list its own. {robot} -# expands to robot_ per robot. /tf + /tf_static are what let a Foxglove -# 3D panel pose anything at all during replay. -DEFAULT_RECORD_TOPICS = [ +# Default topics for `record.scope: gcs` — one recorder in the GCS container +# on domain 0, where each robot's domain_bridge (autonomy_bringup +# onboard_all/config/domain_bridge.yaml) forwards its state topics. All +# robots land in ONE mcap, and replaying it in Foxglove reproduces the live +# GCS view (gcs_visualizer markers are already in the global 'map' frame). +# {robot} expands to robot_1..robot_N and the result is unioned. +DEFAULT_GCS_RECORD_TOPICS = [ "/tf", "/tf_static", - "/{robot}/odometry", - "/{robot}/interface/mavros/local_position/odom", - "/{robot}/odom_ground_truth", + "/{robot}/odometry_conversion/odometry", + "/{robot}/interface/mavros/global_position/global", + "/{robot}/trajectory_controller/trajectory_vis", + "/{robot}/global_plan", + "/gcs/robot_markers", + "/gcs/map_origin/location", + "/gcs/map_origin/ground_msl", + "/gcs/{robot}/location", +] + +# Default topics for `record.scope: robot` — one recorder (and one mcap) per +# robot on its own DDS domain. Use this scope for topics that are NOT +# bridged to the GCS (raw sensors, high-rate local topics). +DEFAULT_ROBOT_RECORD_TOPICS = [ + "/tf", + "/tf_static", + "/{robot}/odometry_conversion/odometry", + "/{robot}/interface/mavros/global_position/global", "/{robot}/global_plan", ] # In-container staging dir for bag recordings (docker cp'd out before the -# stack goes down). Lives in robot container 1 regardless of robot count — -# all replicas share the bridge network, so any container reaches any -# robot's DDS domain by exporting that robot's ROS_DOMAIN_ID. +# stack goes down). BAG_STAGING_DIR = "/tmp/osmo_bags" # Tasks the GCS action_relay bridges (gcs/ros_ws/src/action_relay). Goals for @@ -80,7 +96,10 @@ "iterations": 1, "on_step_failure": "abort_iteration", # continue | abort_iteration | abort_mission "ready": {"timeout_s": 600, "poll_interval_s": 5}, - "record": {"enabled": True}, + # scope "gcs": one recorder on GCS domain 0 → one mcap with every robot's + # bridged topics (default). scope "robot": one recorder + one mcap per + # robot on its own domain (for unbridged/high-rate topics). + "record": {"enabled": True, "scope": "gcs"}, # How the stack is brought up. Either (or both): # services: [isaac-sim, robot-desktop, gcs] → ./airstack.sh up # (compose auto-enables a named service's profile) @@ -186,6 +205,9 @@ def load_mission(path): f"got {merged['on_step_failure']!r}") if merged["command_route"] not in ("gcs", "robot"): raise ValueError(f"command_route must be gcs|robot, got {merged['command_route']!r}") + if merged["record"].get("scope", "gcs") not in ("gcs", "robot"): + raise ValueError(f"record.scope must be gcs|robot, " + f"got {merged['record'].get('scope')!r}") for step in merged["steps"]: action = step.get("action") if isinstance(step, dict) else None if action: @@ -218,6 +240,15 @@ def step_robots(step_spec, num_robots): return [int(r) for r in robots] +def uses_gcs_route(mission): + """True if any action step routes through the GCS action_relay.""" + for step in mission["steps"]: + action = step.get("action") if isinstance(step, dict) else None + if action and action.get("via", mission["command_route"]) == "gcs": + return True + return False + + # ── stack lifecycle ──────────────────────────────────────────────────────── class Stack: @@ -307,13 +338,46 @@ def wait_ready(self, container): raise RuntimeError(f"robots {pending} not ready within {cfg['timeout_s']}s " f"(connected so far: {sorted(connected)})") log(f"PX4 ready: {ready_at}") + + # Gate 3 (only when actions route via the GCS): the per-robot + # action_relay nodes must be up on the GCS before goals are sent — + # the relay's goal subscription is volatile, so a goal published + # before the relay exists is silently lost. + if uses_gcs_route(self.mission): + while time.time() < deadline: + gcs = gcs_container() + if gcs: + r = ros2_exec(gcs, "timeout 10 ros2 node list", + domain_id=0, setup_bash=GCS_SETUP_BASH, timeout=20) + missing = [n for n in range(1, self.num_robots + 1) + if f"action_relay_robot_{n}" not in r.stdout] + if not missing: + ready_at["gcs_relay"] = round(time.time() - started, 2) + log(f"GCS action_relay ready ({ready_at['gcs_relay']}s)") + break + log(f"waiting for GCS action_relay: missing robots {missing}") + else: + log("waiting for gcs container") + time.sleep(cfg["poll_interval_s"]) + if "gcs_relay" not in ready_at: + raise RuntimeError( + f"GCS action_relay not ready within {cfg['timeout_s']}s — " + f"is the gcs service in stack.services/profiles and " + f"AUTOLAUNCH=true? (or set command_route: robot)") return ready_at # ── bag recording ────────────────────────────────────────────────────────── class Recorder: - """One `ros2 bag record -s mcap` per robot, all inside robot container 1. + """`ros2 bag record -s mcap` per the mission's record.scope. + + scope "gcs" (default): a single recorder in the GCS container on domain + 0, where every robot's domain_bridge forwards its state topics — one + mcap holds all robots, and Foxglove replay matches the live GCS view. + + scope "robot": one recorder per robot inside robot container 1, each on + that robot's DDS domain — for topics that aren't bridged to the GCS. Each recorder is started detached with its PID dropped to a file, and stopped with SIGTERM so rosbag2 finalizes the mcap cleanly. (Not SIGINT: @@ -321,47 +385,75 @@ class Recorder: SIG_IGN, so it would never be delivered; rosbag2 handles SIGTERM the same way.) Bags are docker cp'd to the host before the stack goes down.""" - def __init__(self, container, mission, num_robots, setup_bash): - self.container = container + def __init__(self, robot_container, mission, num_robots, setup_bash): + self.robot_container = robot_container self.cfg = mission["record"] + self.scope = self.cfg.get("scope", "gcs") self.num_robots = num_robots self.setup_bash = setup_bash + # (container, domain_id, tag) per active recorder. self.active = [] + def _topic_selection(self, robots): + """Build the `ros2 bag record` topic args; `robots` is the list of + robot indices whose {robot}/{n} placeholders to expand (unioned, + order-preserving, deduplicated).""" + if self.cfg.get("all"): + return "-a" + default = (DEFAULT_GCS_RECORD_TOPICS if self.scope == "gcs" + else DEFAULT_ROBOT_RECORD_TOPICS) + topics = [] + for t in self.cfg.get("topics", default): + if "{robot}" in t or "{n}" in t: + topics.extend(expand(t, n) for n in robots) + else: + topics.append(t) + seen = set() + unique = [t for t in topics if not (t in seen or seen.add(t))] + return " ".join(shlex.quote(t) for t in unique) + + def _start_one(self, container, domain_id, tag, selection, setup_bash): + out_dir = f"{BAG_STAGING_DIR}/{tag}" + inner = ( + f"{ros2_env_prefix(setup_bash, domain_id)} && " + f"mkdir -p {BAG_STAGING_DIR} && rm -rf {out_dir} && " + # nohup + pidfile: the record process must outlive this docker + # exec; topics that don't exist yet are fine (record waits). + f"nohup ros2 bag record -s mcap -o {out_dir} {selection} " + f"> {BAG_STAGING_DIR}/record_{tag}.log 2>&1 & " + f"echo $! > {BAG_STAGING_DIR}/record_{tag}.pid" + ) + r = docker_exec(container, inner, timeout=30) + if r.returncode == 0: + self.active.append((container, tag)) + n_topics = "all topics" if selection == "-a" else f"{len(selection.split())} topics" + log(f"recording [{tag}] in {container} (domain {domain_id}) " + f"→ {out_dir} ({n_topics})") + else: + log(f"WARN: failed to start recorder [{tag}]: {r.stderr.strip()[:200]}") + def start(self): if not self.cfg.get("enabled", True): log("recording disabled by mission spec") return - docker_exec(self.container, - f"rm -rf {BAG_STAGING_DIR} && mkdir -p {BAG_STAGING_DIR}", timeout=15) - for n in range(1, self.num_robots + 1): - if self.cfg.get("all"): - selection = "-a" - else: - topics = [expand(t, n) for t in self.cfg.get("topics", DEFAULT_RECORD_TOPICS)] - selection = " ".join(shlex.quote(t) for t in topics) - out_dir = f"{BAG_STAGING_DIR}/robot_{n}" - inner = ( - f"{ros2_env_prefix(self.setup_bash, n)} && " - # nohup + pidfile: the record process must outlive this - # docker exec; --include-hidden-topics is not needed, and - # unknown listed topics are fine (record waits for them). - f"nohup ros2 bag record -s mcap -o {out_dir} {selection} " - f"> {BAG_STAGING_DIR}/record_{n}.log 2>&1 & " - f"echo $! > {BAG_STAGING_DIR}/record_{n}.pid" - ) - r = docker_exec(self.container, inner, timeout=30) - if r.returncode == 0: - self.active.append(n) - log(f"recording robot_{n} → {out_dir} " - f"({'all topics' if self.cfg.get('all') else f'{len(selection.split())} topics'})") - else: - log(f"WARN: failed to start recorder for robot_{n}: {r.stderr.strip()[:200]}") + robots = list(range(1, self.num_robots + 1)) + if self.scope == "gcs": + gcs = gcs_container() + if not gcs: + log("WARN: record.scope is 'gcs' but no gcs container is running — " + "recording skipped (bring up the gcs service or use scope: robot)") + return + self._start_one(gcs, 0, "gcs", self._topic_selection(robots), + GCS_SETUP_BASH) + else: + for n in robots: + self._start_one(self.robot_container, n, f"robot_{n}", + self._topic_selection([n]), self.setup_bash) def stop(self): - for n in self.active: - docker_exec(self.container, f""" - pid=$(cat {BAG_STAGING_DIR}/record_{n}.pid 2>/dev/null) || exit 0 + for container, tag in self.active: + docker_exec(container, f""" + pid=$(cat {BAG_STAGING_DIR}/record_{tag}.pid 2>/dev/null) || exit 0 kill -TERM "$pid" 2>/dev/null || exit 0 for i in $(seq 1 20); do kill -0 "$pid" 2>/dev/null || exit 0 @@ -370,20 +462,21 @@ def stop(self): kill -9 "$pid" 2>/dev/null || true """, timeout=40) if self.active: - log(f"recorders stopped for robots {self.active}") - self.active = [] + log(f"recorders stopped: {[tag for _, tag in self.active]}") def collect(self, dest_dir): - if not self.cfg.get("enabled", True): + if not self.active: return dest_dir.mkdir(parents=True, exist_ok=True) - r = sh(["docker", "cp", f"{self.container}:{BAG_STAGING_DIR}/.", str(dest_dir)], - timeout=1800) - if r.returncode != 0: - log(f"WARN: docker cp of bags failed: {r.stderr.strip()[:200]}") - else: - mcaps = list(dest_dir.rglob("*.mcap")) - log(f"collected {len(mcaps)} mcap file(s) → {dest_dir}") + for container in {c for c, _ in self.active}: + r = sh(["docker", "cp", f"{container}:{BAG_STAGING_DIR}/.", str(dest_dir)], + timeout=1800) + if r.returncode != 0: + log(f"WARN: docker cp of bags from {container} failed: " + f"{r.stderr.strip()[:200]}") + self.active = [] + mcaps = list(dest_dir.rglob("*.mcap")) + log(f"collected {len(mcaps)} mcap file(s) → {dest_dir}") # ── step execution ───────────────────────────────────────────────────────── From 1fdbf8cdc4212bc8a0a53263f73e33c5ac100934 Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Wed, 10 Jun 2026 16:31:53 -0400 Subject: [PATCH 4/7] updates to example mission --- osmo/missions/example_takeoff_land.yaml | 23 ++++---- osmo/workspace/mission_runner.py | 76 +++++++++++++++++-------- 2 files changed, 64 insertions(+), 35 deletions(-) diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml index 5f3275204..727c420a1 100644 --- a/osmo/missions/example_takeoff_land.yaml +++ b/osmo/missions/example_takeoff_land.yaml @@ -74,23 +74,26 @@ record: # continue | abort_iteration (default) | abort_mission on_step_failure: abort_iteration +# Steps run top to bottom. Every step that addresses a robot fans out over a +# robot selection — `robots:` (default `all` = robots 1..NUM_ROBOTS, or a +# list like [1, 3]) — and {robot} → robot_N / {n} → N expand per robot. +# Per-step knobs (all optional, shown here with their defaults): +# action: robots=all via= timeout_s=120 +# via: gcs (through the GCS action_relay) | robot (direct send_goal) +# run: robots=all container=robot_{n} (per-robot) or robot_1 | gcs | pod | +# ; fans out when cmd/container references {robot}/{n} +# wait: (runs once) steps: - action: - task: takeoff # → /robot_N/tasks/takeoff (TakeoffTask) + task: takeoff # → //tasks/takeoff (TakeoffTask) goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} - timeout_s: 120 - robots: all # or e.g. [1, 3] - wait: 30 # hover - - run: # arbitrary command escape hatch - container: robot_1 # robot_N = exec on robot N's DDS domain; - # also: gcs | pod | - cmd: ros2 topic echo --once /robot_1/odometry + - run: # arbitrary command, fanned out per robot + cmd: ros2 topic echo --once /{robot}/odometry_conversion/odometry timeout_s: 20 - action: - task: land # → /robot_N/tasks/land (LandTask) + task: land # → //tasks/land (LandTask) goal: {velocity_m_s: 0.5} - timeout_s: 120 - # via: robot # bypass the GCS relay for this step diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py index 9d054b070..74e6aa475 100644 --- a/osmo/workspace/mission_runner.py +++ b/osmo/workspace/mission_runner.py @@ -233,6 +233,11 @@ def expand(text, n): return text.replace("{robot}", f"robot_{n}").replace("{n}", str(n)) +def has_placeholder(*strings): + """True if any string contains a per-robot placeholder ({robot} or {n}).""" + return any(s and ("{robot}" in s or "{n}" in s) for s in strings) + + def step_robots(step_spec, num_robots): robots = step_spec.get("robots", "all") if robots == "all": @@ -491,6 +496,34 @@ def tail(text, lines=15): return "\n".join((text or "").strip().splitlines()[-lines:]) +def _run_one(stack, robot_container, target, cmd, timeout, expect_success): + """Run one resolved `run` command. `target` is already robot-expanded: + pod → on the pod itself (cwd = AirStack root) + gcs → in the gcs container on domain 0 + robot_ → in robot container 1 on robot N's DDS domain + → literal container name (domain 0) + """ + if target == "pod": + r = sh(["bash", "-c", cmd], timeout=timeout, cwd=stack.root) + elif target == "gcs": + g = gcs_container() + if g: + r = ros2_exec(g, cmd, domain_id=0, setup_bash=GCS_SETUP_BASH, + timeout=timeout + 15) + else: + r = subprocess.CompletedProcess(cmd, returncode=1, stdout="", + stderr="no gcs container running") + elif target.startswith("robot_") and target[6:].isdigit(): + n = int(target[6:]) + r = ros2_exec(robot_container, cmd, domain_id=n, + setup_bash=stack.setup_bash, timeout=timeout + 15) + else: + r = docker_exec(target, cmd, timeout=timeout + 15) + ok = (r.returncode == 0) if expect_success else True + return {"target": target, "exit": r.returncode, "ok": ok, + "output_tail": tail(r.stdout + r.stderr)} + + def run_step(stack, container, step_spec, step_index): """Execute one step; returns a result dict with ok: bool.""" record = {"index": step_index, "spec": step_spec, @@ -574,32 +607,25 @@ def send(n): spec = step_spec["run"] cmd = spec["cmd"] timeout = float(spec.get("timeout_s", 60)) - target = spec.get("container", "robot_1") - log(f"step {step_index}: run [{target}] {cmd}") - if target == "pod": - r = sh(["bash", "-c", cmd], timeout=timeout, cwd=stack.root) - elif target == "gcs": - g = gcs_container() - if g: - r = ros2_exec(g, cmd, domain_id=0, setup_bash=GCS_SETUP_BASH, - timeout=timeout + 15) - else: - r = subprocess.CompletedProcess( - cmd, returncode=1, stdout="", - stderr="no gcs container running") + expect = spec.get("expect_success", True) + # `container` may reference {n}/{robot} to fan out over robots. If it's + # omitted, default to robot_{n} when the command is per-robot (has a + # placeholder) and robot_1 otherwise. A step fans out over `robots` + # (default all) iff its command or container references {n}/{robot}. + target = spec.get("container") or ("robot_{n}" if has_placeholder(cmd) else "robot_1") + if has_placeholder(cmd, target): + robots = step_robots(spec, stack.num_robots) + log(f"step {step_index}: run [{target}] {cmd} → robots {robots}") + results = {} + for n in robots: + results[n] = _run_one(stack, container, expand(target, n), + expand(cmd, n), timeout, expect) + record.update(type="run", per_robot=results, + ok=all(v["ok"] for v in results.values())) else: - # robot_N targets exec into robot container 1 on robot N's DDS - # domain (the containers share one bridge network); any other - # value is taken as a literal container name on domain 0. - if target.startswith("robot_") and target[6:].isdigit(): - n = int(target[6:]) - r = ros2_exec(container, expand(cmd, n), domain_id=n, - setup_bash=stack.setup_bash, timeout=timeout + 15) - else: - r = docker_exec(target, cmd, timeout=timeout + 15) - ok = r.returncode == 0 if spec.get("expect_success", True) else True - record.update(type="run", exit=r.returncode, ok=ok, - output_tail=tail(r.stdout + r.stderr)) + log(f"step {step_index}: run [{target}] {cmd}") + res = _run_one(stack, container, target, cmd, timeout, expect) + record.update(type="run", **res) elif "topic_pub" in step_spec: spec = step_spec["topic_pub"] From 61b673be32ffcd2fcad51346a56b1b6dbdd4da2c Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Thu, 11 Jun 2026 15:30:11 -0400 Subject: [PATCH 5/7] fixed mission runner script --- osmo/missions/example_takeoff_land.yaml | 15 +++- osmo/workflows/airstack-mission.yaml | 36 ++++++++-- osmo/workspace/entrypoint.sh | 83 +++++++-------------- osmo/workspace/mission_launcher.sh | 96 +++++++++++++++++++++++++ 4 files changed, 167 insertions(+), 63 deletions(-) create mode 100755 osmo/workspace/mission_launcher.sh diff --git a/osmo/missions/example_takeoff_land.yaml b/osmo/missions/example_takeoff_land.yaml index 727c420a1..1eaa96554 100644 --- a/osmo/missions/example_takeoff_land.yaml +++ b/osmo/missions/example_takeoff_land.yaml @@ -18,7 +18,7 @@ name: example_takeoff_land # watch the run over WebRTC. Alternatively use `profiles:` to export # COMPOSE_PROFILES and run a plain `airstack up`. stack: - services: [isaac-sim, robot-desktop, gcs] + services: [isaac-sim-livestream, robot-desktop, gcs] # profiles: [desktop, isaac-sim] # Exported before every `airstack up`. Anything the compose stack reads from @@ -87,12 +87,23 @@ steps: - action: task: takeoff # → //tasks/takeoff (TakeoffTask) goal: {target_altitude_m: 10.0, velocity_m_s: 1.0} + #to specify robot(s) + #robots: [1] (1st robot) + #robots: [1, 3] (1st and 3rd robot) + #robots: all (all robots) - wait: 30 # hover - - run: # arbitrary command, fanned out per robot + - run: # arbitrary command, that runs per robot cmd: ros2 topic echo --once /{robot}/odometry_conversion/odometry timeout_s: 20 + #to specify robot(s): + #robots: [1] (1st robot) + #robots: [1, 3] (1st and 3rd robot) + + #To specify the container: + #container: gcs (GCS container) + #container: - action: task: land # → //tasks/land (LandTask) diff --git a/osmo/workflows/airstack-mission.yaml b/osmo/workflows/airstack-mission.yaml index 094ed25aa..80e79a659 100644 --- a/osmo/workflows/airstack-mission.yaml +++ b/osmo/workflows/airstack-mission.yaml @@ -1,14 +1,22 @@ # AirStack batch mission workflow on OSMO. # # Same workspace image and pod layout as airstack-dev.yaml, but instead of a -# single `airstack up` for an interactive session, the entrypoint hands off -# to osmo/workspace/mission_runner.py, which executes the mission spec named -# by OSMO_MISSION_FILE: repeated cycles of +# single `airstack up` for an interactive session, this runs a batch mission: +# osmo/workspace/mission_runner.py executes the mission spec named by +# OSMO_MISSION_FILE — repeated cycles of # # airstack down → airstack up → wait for PX4 ready → record mcap bags # → run steps (takeoff / land / navigate / any ros2 command) → collect # bags + logs → airstack down # +# NO IMAGE REBUILD NEEDED. The mission engine is NOT baked into the image: +# the command below reuses the baked entrypoint for pod setup only +# (OSMO_AIRSTACK_UP=false → sshd, inner dockerd, branch clone, creds, registry +# login — no `airstack up`), then hands off to mission_launcher.sh FROM THE +# CLONE. So `git push` your branch and resubmit; the launcher + runner + spec +# all come from the branch. (The launcher pip-installs PyYAML at runtime if +# the image predates it.) +# # Mission specs live in osmo/missions/ (cloned with the branch, so what you # push is what runs). Schema: osmo/missions/README.md. # @@ -41,8 +49,26 @@ workflow: image: airlab-docker.andrew.cmu.edu/airstack/airstack-osmo-workspace:latest # Required so the inner dockerd can run (same DinD setup as airstack-dev). privileged: true - command: ["bash"] - args: ["/usr/local/bin/entrypoint.sh"] + # Bootstrap: run the baked entrypoint for pod setup ONLY + # (OSMO_AIRSTACK_UP=false), backgrounded, then hand off to the launcher + # that ships in the freshly-cloned branch. This is what makes the + # mission engine rebuild-free — see the header comment. + command: ["bash", "-c"] + args: + - | + set -uo pipefail + log() { echo "[mission-bootstrap] $*"; } + log "starting pod setup via baked entrypoint (OSMO_AIRSTACK_UP=false)" + OSMO_AIRSTACK_UP=false /usr/local/bin/entrypoint.sh & + LAUNCHER=/root/AirStack/osmo/workspace/mission_launcher.sh + log "waiting for branch clone to provide $LAUNCHER" + for i in $(seq 1 600); do [ -f "$LAUNCHER" ] && break; sleep 2; done + if [ ! -f "$LAUNCHER" ]; then + log "ERROR: $LAUNCHER not found after clone wait; sleeping for SSH debug" + exec sleep infinity + fi + log "handing off to mission_launcher.sh" + exec bash "$LAUNCHER" environment: # Mission selection — repo-relative path into the clone. Override at # submit time to run a different mission. diff --git a/osmo/workspace/entrypoint.sh b/osmo/workspace/entrypoint.sh index 20d89910b..dd53a94ae 100755 --- a/osmo/workspace/entrypoint.sh +++ b/osmo/workspace/entrypoint.sh @@ -241,70 +241,41 @@ else log "WARN: --payload username= password=" fi -# ─── 6. airstack up / mission mode ───────────────────────────────────────── +# ─── 6. airstack up ──────────────────────────────────────────────────────── + +# Honor optional overrides passed in via OSMO env. Defaults match a "single +# robot, Isaac Sim with WebRTC livestream" dev session. +export AUTOLAUNCH="${AUTOLAUNCH:-true}" +export NUM_ROBOTS="${NUM_ROBOTS:-1}" +export ISAAC_SIM_LIVESTREAM="${ISAAC_SIM_LIVESTREAM:-true}" + +# COMPOSE_PROFILES selection: the default `desktop,isaac-sim` from .env runs +# the standard isaac-sim service. If the student wants livestream, they (or +# we) swap to the isaac-sim-livestream profile, which is the OSMO-friendly +# variant defined in simulation/isaac-sim/docker/docker-compose.yaml. +if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then + export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim-livestream}" +else + export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim}" +fi +# OSMO_AIRSTACK_UP=false skips `airstack up` and goes straight to sleep — the +# pod is fully set up (sshd, dockerd, clone, creds) for the caller to drive. +# The airstack-mission.yaml workflow uses this hook: it runs this entrypoint +# for setup only, then hands off to osmo/workspace/mission_launcher.sh (from +# the clone, so the mission engine ships with the branch — no image rebuild). cd "$AIRSTACK_ROOT" - -if [ -n "${OSMO_MISSION_FILE:-}" ]; then - # Mission mode (airstack-mission.yaml): mission_runner.py owns the full - # stack lifecycle — repeated `airstack down/up` per iteration, PX4 - # readiness gating, mcap recording, and artifact collection into - # /osmo/output. Stack env (NUM_ROBOTS, COMPOSE_PROFILES, - # ISAAC_SIM_SCRIPT_NAME, ...) comes from the mission spec's `env:` block, - # so none of the dev-session defaults below are exported here. - # - # The runner is taken from the clone, not baked into the image, so the - # mission spec and the code executing it always come from the same branch. - MISSION_PATH="$AIRSTACK_ROOT/$OSMO_MISSION_FILE" - [ -f "$MISSION_PATH" ] || MISSION_PATH="$OSMO_MISSION_FILE" # allow absolute paths - if [ ! -f "$MISSION_PATH" ]; then - log "ERROR: mission file not found: $OSMO_MISSION_FILE — pod stays alive for debugging via SSH" - else - log "mission mode: python3 osmo/workspace/mission_runner.py $MISSION_PATH" - if python3 "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py" "$MISSION_PATH" \ - --airstack-root "$AIRSTACK_ROOT"; then - log "mission runner finished: all iterations passed" - else - log "WARN: mission runner exited non-zero — see summary.json / steps.json in the results dir" - fi - fi - if [ "${OSMO_MISSION_KEEP_ALIVE:-true}" != "true" ]; then - log "OSMO_MISSION_KEEP_ALIVE=false — exiting so OSMO uploads /osmo/output and frees the GPU" - # Exit 0 regardless of mission status: a clean task exit is what - # triggers the `outputs:` upload; mission pass/fail lives in summary.json. - exit 0 - fi - log "OSMO_MISSION_KEEP_ALIVE=true — pod stays alive; download results with 'airstack osmo:fetch'" +if [ "${OSMO_AIRSTACK_UP:-true}" = "true" ]; then + log "airstack up (COMPOSE_PROFILES=$COMPOSE_PROFILES, NUM_ROBOTS=$NUM_ROBOTS, livestream=$ISAAC_SIM_LIVESTREAM)" + ./airstack.sh up || log "WARN: airstack up exited non-zero — pod stays alive for debugging via SSH" else - # Dev mode (airstack-dev.yaml): single bring-up, then the student drives. - # Honor optional overrides passed in via OSMO env. Defaults match a - # "single robot, Isaac Sim with WebRTC livestream" dev session. - export AUTOLAUNCH="${AUTOLAUNCH:-true}" - export NUM_ROBOTS="${NUM_ROBOTS:-1}" - export ISAAC_SIM_LIVESTREAM="${ISAAC_SIM_LIVESTREAM:-true}" - - # COMPOSE_PROFILES selection: the default `desktop,isaac-sim` from .env runs - # the standard isaac-sim service. If the student wants livestream, they (or - # we) swap to the isaac-sim-livestream profile, which is the OSMO-friendly - # variant defined in simulation/isaac-sim/docker/docker-compose.yaml. - if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then - export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim-livestream}" - else - export COMPOSE_PROFILES="${COMPOSE_PROFILES:-desktop,isaac-sim}" - fi - - if [ "${OSMO_AIRSTACK_UP:-true}" = "true" ]; then - log "airstack up (COMPOSE_PROFILES=$COMPOSE_PROFILES, NUM_ROBOTS=$NUM_ROBOTS, livestream=$ISAAC_SIM_LIVESTREAM)" - ./airstack.sh up || log "WARN: airstack up exited non-zero — pod stays alive for debugging via SSH" - else - log "OSMO_AIRSTACK_UP=false — skipping airstack up; SSH in and run ./airstack.sh up manually" - fi + log "OSMO_AIRSTACK_UP=false — skipping airstack up; caller drives the stack" fi # ─── 7. Sleep ────────────────────────────────────────────────────────────── log "entrypoint complete; sleeping forever so port-forwards keep working" -if [ "${ISAAC_SIM_LIVESTREAM:-false}" = "true" ]; then +if [ "$ISAAC_SIM_LIVESTREAM" = "true" ]; then isaac_sim_log_container="isaac-sim-livestream" else isaac_sim_log_container="airstack-isaac-sim-1" diff --git a/osmo/workspace/mission_launcher.sh b/osmo/workspace/mission_launcher.sh new file mode 100755 index 000000000..a054fb9d8 --- /dev/null +++ b/osmo/workspace/mission_launcher.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash +# mission_launcher.sh — no-rebuild mission entrypoint for AirStack-on-OSMO. +# +# The airstack-mission.yaml workflow runs the *baked* image entrypoint with +# OSMO_AIRSTACK_UP=false (pod setup only: sshd, inner dockerd, branch clone, +# Nucleus creds, registry login), then hands off to THIS script — which lives +# in the clone, so the mission engine always comes from your branch and no +# workspace-image rebuild is needed to iterate. (PyYAML, which the baked image +# may predate, is installed at runtime below.) +# +# Responsibilities: +# 1. Wait for the setup the baked entrypoint performs in the background +# (inner dockerd ready, clone present, registry login done). +# 2. Ensure PyYAML is importable. +# 3. Run mission_runner.py against $OSMO_MISSION_FILE. +# 4. Keep-alive: stay foreground (sleep) so the pod survives for +# `airstack osmo:fetch`, or exit so OSMO uploads /osmo/output + frees GPU. + +set -uo pipefail + +AIRSTACK_ROOT="${AIRSTACK_ROOT:-/root/AirStack}" +AIRLAB_REGISTRY="${AIRLAB_REGISTRY:-airlab-docker.andrew.cmu.edu}" + +log() { echo "[mission-launcher] $*"; } +fail() { echo "[mission-launcher] ERROR: $*" >&2; } + +# wait_for — poll until the command +# succeeds or the timeout elapses. Returns non-zero on timeout. +wait_for() { + local desc="$1" timeout="$2"; shift 2 + local i=0 + until "$@" >/dev/null 2>&1; do + i=$((i + 1)) + if [ "$i" -ge "$timeout" ]; then + fail "timed out after ${timeout}s waiting for: ${desc}" + return 1 + fi + [ $((i % 15)) -eq 0 ] && log "still waiting for ${desc} (${i}s)" + sleep 1 + done + log "ready: ${desc}" +} + +# ── 1. setup readiness (driven by the backgrounded baked entrypoint) ─────── +# If dockerd or the clone never appear, the pod is broken — stay alive (when +# keep-alive) so it can be inspected over SSH rather than vanishing. +if ! wait_for "inner dockerd" 180 docker info; then + [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ] && exec sleep infinity + exit 1 +fi +if ! wait_for "branch clone" 600 test -f "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py"; then + [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ] && exec sleep infinity + exit 1 +fi + +# Registry login is performed by the baked entrypoint (step 5) only when +# AIRLAB_REGISTRY_USER is set. Wait for it so `airstack up`'s image pulls +# don't race the login; warn (don't abort) if it never lands. +if [ -n "${AIRLAB_REGISTRY_USER:-}" ]; then + wait_for "registry login (${AIRLAB_REGISTRY})" 180 \ + grep -q "$AIRLAB_REGISTRY" /root/.docker/config.json \ + || log "WARN: registry login not detected — image pulls may fail" +fi + +# ── 2. PyYAML (mission_runner imports yaml) ──────────────────────────────── +if ! python3 -c "import yaml" >/dev/null 2>&1; then + log "installing PyYAML (image predates python3-yaml)" + pip3 install --break-system-packages --quiet pyyaml \ + || { apt-get update -qq && apt-get install -y -qq python3-yaml; } \ + || fail "could not install PyYAML — mission_runner will fail to import yaml" +fi + +# ── 3. run the mission ───────────────────────────────────────────────────── +MISSION="${OSMO_MISSION_FILE:-}" +if [ -z "$MISSION" ]; then + fail "OSMO_MISSION_FILE not set — nothing to run" +else + MISSION_PATH="$AIRSTACK_ROOT/$MISSION" + [ -f "$MISSION_PATH" ] || MISSION_PATH="$MISSION" # allow an absolute path + if [ -f "$MISSION_PATH" ]; then + log "running mission: $MISSION_PATH" + python3 "$AIRSTACK_ROOT/osmo/workspace/mission_runner.py" "$MISSION_PATH" \ + --airstack-root "$AIRSTACK_ROOT" + log "mission_runner exited $?" + else + fail "mission file not found: $MISSION (looked under the clone and as an absolute path)" + fi +fi + +# ── 4. lifetime ──────────────────────────────────────────────────────────── +if [ "${OSMO_MISSION_KEEP_ALIVE:-true}" = "true" ]; then + log "OSMO_MISSION_KEEP_ALIVE=true — pod stays alive; fetch with 'airstack osmo:fetch'" + exec sleep infinity +fi +log "OSMO_MISSION_KEEP_ALIVE=false — exiting so OSMO uploads /osmo/output and frees the GPU" +exit 0 From 42ecd1c9836d4a9fcb3e4b7b267866eacfca9909 Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Thu, 11 Jun 2026 16:16:10 -0400 Subject: [PATCH 6/7] fixed duplicate airstack up issue and not recording mcap files --- .gitignore | 4 +- osmo/workspace/mission_launcher.sh | 18 ++++++++ osmo/workspace/mission_runner.py | 68 ++++++++++++++++++++++++++---- 3 files changed, 80 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index d14138ebb..624f0d4e0 100644 --- a/.gitignore +++ b/.gitignore @@ -94,8 +94,10 @@ simulation/ms-airsim/assets/scenes/* # Test results tests/results/ -# OSMO mission results (local runs of osmo/workspace/mission_runner.py) +# OSMO mission results: local runs of osmo/workspace/mission_runner.py write +# to osmo/results/; `airstack osmo:fetch` downloads to ./osmo-results/. osmo/results/ +osmo-results/ # Local-only — embedded sibling repo, not part of this branch common/rayfronts/ diff --git a/osmo/workspace/mission_launcher.sh b/osmo/workspace/mission_launcher.sh index a054fb9d8..05c71763a 100755 --- a/osmo/workspace/mission_launcher.sh +++ b/osmo/workspace/mission_launcher.sh @@ -62,6 +62,24 @@ if [ -n "${AIRLAB_REGISTRY_USER:-}" ]; then || log "WARN: registry login not detected — image pulls may fail" fi +# The deployed :latest image's baked entrypoint runs its OWN `airstack up` +# (that image predates the OSMO_AIRSTACK_UP=false hook, so it ignores the +# request to skip it). If the mission started its own bring-up concurrently, +# the two compose runs collide — duplicate network, container-name conflicts, +# "network not found" mid-teardown — and the first iteration fails. So wait +# for the baked entrypoint to FINISH its bring-up (which also warms the inner +# image cache the mission then reuses) before handing off. Detect completion +# by its terminal `sleep infinity`. On a rebuilt image where the hook works, +# the entrypoint skips `up` and reaches that sleep almost immediately, so this +# is a fast no-op — correct either way. +log "waiting for the baked entrypoint to finish its own bring-up (avoids a concurrent 'airstack up')" +sleep 10 # let the baked entrypoint actually reach its `airstack up` +if wait_for "baked entrypoint idle" 2400 pgrep -f "sleep infinity"; then + sleep 5 # let compose fully release the network before the mission's first down/up +else + log "WARN: baked entrypoint still busy after 40m — proceeding; first iteration may race its bring-up" +fi + # ── 2. PyYAML (mission_runner imports yaml) ──────────────────────────────── if ! python3 -c "import yaml" >/dev/null 2>&1; then log "installing PyYAML (image predates python3-yaml)" diff --git a/osmo/workspace/mission_runner.py b/osmo/workspace/mission_runner.py index 74e6aa475..ec662fc28 100644 --- a/osmo/workspace/mission_runner.py +++ b/osmo/workspace/mission_runner.py @@ -184,6 +184,13 @@ def gcs_container(): return names[0] if names else None +def stack_containers(): + """Running containers belonging to the AirStack compose stack (robot, gcs, + and any sim variant — isaac-sim / isaac-sim-livestream / ms-airsim).""" + pats = ("airstack", "isaac-sim", "ms-airsim") + return [n for n in list_containers() if any(p in n for p in pats)] + + # ── mission spec ─────────────────────────────────────────────────────────── def load_mission(path): @@ -304,6 +311,32 @@ def down(self): if result.returncode != 0: log(f"WARN: airstack down exited {result.returncode}") + def ensure_down(self): + """`airstack status`; if any stack containers exist, `airstack down` + and BLOCK until they're actually gone before returning. + + This is the pre-`up` guard: a fresh `airstack up` must not race a + previous bring-up's containers/network (the baked entrypoint leaves a + stack up on a stale image; the prior iteration leaves one too). + Starting `up` while those still exist causes duplicate-network and + container-name conflicts.""" + self._airstack("status", 60) + existing = stack_containers() + if not existing: + return + log(f"existing stack containers found {existing} — bringing them down first") + self.down() + deadline = time.time() + self.mission["down_timeout_s"] + while time.time() < deadline: + remaining = stack_containers() + if not remaining: + log("stack fully down; safe to bring up") + return + log(f"waiting for teardown: {remaining}") + time.sleep(2) + log(f"WARN: containers still present after down: {stack_containers()} " + f"— proceeding, `up` may conflict") + def wait_ready(self, container): """Two sequential gates per robot (same as the system tests): 1. mavros/state reports connected=True (MAVROS ↔ PX4 heartbeat); @@ -419,23 +452,36 @@ def _topic_selection(self, robots): def _start_one(self, container, domain_id, tag, selection, setup_bash): out_dir = f"{BAG_STAGING_DIR}/{tag}" + log_file = f"{BAG_STAGING_DIR}/record_{tag}.log" + pid_file = f"{BAG_STAGING_DIR}/record_{tag}.pid" + # Create the staging dir in the FOREGROUND, THEN background only the + # recorder. Folding `mkdir` into the same `... &` chain backgrounds the + # mkdir too, so the foreground pidfile write races ahead of the dir + # existing ("record_.pid: No such file or directory"). nohup + + # pidfile lets the recorder outlive this docker exec; topics that + # don't exist yet are fine (rosbag2 waits for them). inner = ( - f"{ros2_env_prefix(setup_bash, domain_id)} && " - f"mkdir -p {BAG_STAGING_DIR} && rm -rf {out_dir} && " - # nohup + pidfile: the record process must outlive this docker - # exec; topics that don't exist yet are fine (record waits). + f"mkdir -p {BAG_STAGING_DIR} && rm -rf {out_dir}\n" + f"{ros2_env_prefix(setup_bash, domain_id)}\n" f"nohup ros2 bag record -s mcap -o {out_dir} {selection} " - f"> {BAG_STAGING_DIR}/record_{tag}.log 2>&1 & " - f"echo $! > {BAG_STAGING_DIR}/record_{tag}.pid" + f"> {log_file} 2>&1 &\n" + f"echo $! > {pid_file}\n" + # Confirm the recorder is actually alive — a bad topic name or a + # missing mcap plugin would make it exit immediately, and a silent + # recording failure is the worst outcome for a mission. + f"sleep 2\n" + f"if kill -0 \"$(cat {pid_file})\" 2>/dev/null; then echo RECORDER_ALIVE; " + f"else echo RECORDER_DEAD; tail -n 5 {log_file} 2>/dev/null; fi" ) r = docker_exec(container, inner, timeout=30) - if r.returncode == 0: + if "RECORDER_ALIVE" in r.stdout: self.active.append((container, tag)) n_topics = "all topics" if selection == "-a" else f"{len(selection.split())} topics" log(f"recording [{tag}] in {container} (domain {domain_id}) " f"→ {out_dir} ({n_topics})") else: - log(f"WARN: failed to start recorder [{tag}]: {r.stderr.strip()[:200]}") + log(f"WARN: recorder [{tag}] failed to start / exited immediately: " + f"{tail(r.stdout + r.stderr, 6)}") def start(self): if not self.cfg.get("enabled", True): @@ -719,7 +765,11 @@ def run_iteration(stack, mission, iter_dir): container = None t0 = time.time() try: - stack.down() + # `airstack status`; if a stack is already up (baked entrypoint on a + # stale image, or the previous iteration), down it and wait for the + # containers to fully disappear before bringing up — otherwise the + # `up` races leftover containers/network. + stack.ensure_down() containers = stack.up() container = containers[0] summary["up_duration_s"] = round(time.time() - t0, 2) From 0c582b033504f6447a9f73c1acef1255443d1219 Mon Sep 17 00:00:00 2001 From: krrishj18 Date: Thu, 11 Jun 2026 16:58:32 -0400 Subject: [PATCH 7/7] fixed iteration 1 not working --- osmo/workflows/airstack-mission.yaml | 7 ++++- osmo/workspace/mission_launcher.sh | 44 ++++++++++++---------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/osmo/workflows/airstack-mission.yaml b/osmo/workflows/airstack-mission.yaml index 80e79a659..7b7498452 100644 --- a/osmo/workflows/airstack-mission.yaml +++ b/osmo/workflows/airstack-mission.yaml @@ -59,7 +59,12 @@ workflow: set -uo pipefail log() { echo "[mission-bootstrap] $*"; } log "starting pod setup via baked entrypoint (OSMO_AIRSTACK_UP=false)" - OSMO_AIRSTACK_UP=false /usr/local/bin/entrypoint.sh & + # Tee the baked entrypoint's output so the launcher can wait for its + # terminal "sleeping forever" line (= setup + its own `airstack up` + # finished) before starting the mission — see mission_launcher.sh. + # The pipe keeps its logs in `osmo workflow logs` too. + OSMO_AIRSTACK_UP=false /usr/local/bin/entrypoint.sh 2>&1 \ + | tee /tmp/baked-entrypoint.log & LAUNCHER=/root/AirStack/osmo/workspace/mission_launcher.sh log "waiting for branch clone to provide $LAUNCHER" for i in $(seq 1 600); do [ -f "$LAUNCHER" ] && break; sleep 2; done diff --git a/osmo/workspace/mission_launcher.sh b/osmo/workspace/mission_launcher.sh index 05c71763a..e7f757123 100755 --- a/osmo/workspace/mission_launcher.sh +++ b/osmo/workspace/mission_launcher.sh @@ -19,7 +19,6 @@ set -uo pipefail AIRSTACK_ROOT="${AIRSTACK_ROOT:-/root/AirStack}" -AIRLAB_REGISTRY="${AIRLAB_REGISTRY:-airlab-docker.andrew.cmu.edu}" log() { echo "[mission-launcher] $*"; } fail() { echo "[mission-launcher] ERROR: $*" >&2; } @@ -53,32 +52,27 @@ if ! wait_for "branch clone" 600 test -f "$AIRSTACK_ROOT/osmo/workspace/mission_ exit 1 fi -# Registry login is performed by the baked entrypoint (step 5) only when -# AIRLAB_REGISTRY_USER is set. Wait for it so `airstack up`'s image pulls -# don't race the login; warn (don't abort) if it never lands. -if [ -n "${AIRLAB_REGISTRY_USER:-}" ]; then - wait_for "registry login (${AIRLAB_REGISTRY})" 180 \ - grep -q "$AIRLAB_REGISTRY" /root/.docker/config.json \ - || log "WARN: registry login not detected — image pulls may fail" -fi - -# The deployed :latest image's baked entrypoint runs its OWN `airstack up` -# (that image predates the OSMO_AIRSTACK_UP=false hook, so it ignores the -# request to skip it). If the mission started its own bring-up concurrently, -# the two compose runs collide — duplicate network, container-name conflicts, -# "network not found" mid-teardown — and the first iteration fails. So wait -# for the baked entrypoint to FINISH its bring-up (which also warms the inner -# image cache the mission then reuses) before handing off. Detect completion -# by its terminal `sleep infinity`. On a rebuilt image where the hook works, -# the entrypoint skips `up` and reaches that sleep almost immediately, so this -# is a fast no-op — correct either way. +# The deployed :latest image's baked entrypoint ignores OSMO_AIRSTACK_UP and +# runs its OWN `airstack up`. Running the mission's bring-up concurrently +# collides (duplicate network, container-name conflicts, "network not found" +# mid-teardown) and fails the first iteration. So wait for the baked +# entrypoint to FINISH — the bootstrap tees its output to $BAKED_LOG, and its +# terminal "sleeping forever" line is printed exactly once, after its +# `airstack up` completes (success or fail). This also subsumes the clone + +# registry-login steps it performs, and warms the inner image cache the +# mission reuses. On a rebuilt image where the hook works, the entrypoint +# skips `up` and prints that line almost immediately, so this is a fast no-op. +# +# We deliberately do NOT detect completion via `pgrep -f "sleep infinity"`: +# the AirStack containers themselves run `sleep infinity`, so that matches the +# instant the baked bring-up starts a container — firing the handoff in the +# middle of its `airstack up`, which is exactly the race we're avoiding. +BAKED_LOG="${BAKED_ENTRYPOINT_LOG:-/tmp/baked-entrypoint.log}" log "waiting for the baked entrypoint to finish its own bring-up (avoids a concurrent 'airstack up')" -sleep 10 # let the baked entrypoint actually reach its `airstack up` -if wait_for "baked entrypoint idle" 2400 pgrep -f "sleep infinity"; then - sleep 5 # let compose fully release the network before the mission's first down/up -else - log "WARN: baked entrypoint still busy after 40m — proceeding; first iteration may race its bring-up" +if ! wait_for "baked entrypoint complete" 2400 grep -q "sleeping forever" "$BAKED_LOG"; then + log "WARN: baked entrypoint didn't signal completion in 40m — proceeding; first iteration may race its bring-up" fi +sleep 5 # let baked's compose settle before the mission's first ensure_down/up # ── 2. PyYAML (mission_runner imports yaml) ──────────────────────────────── if ! python3 -c "import yaml" >/dev/null 2>&1; then