ROS 2 adapter
Reads rosbag2 directories (sqlite3 + mcap backends) and turns them
into RoboTrace episodes. No rclpy install required - the adapter is
backed by the pure-Python rosbags
library, so it runs on macOS dev boxes and in CI without a ROS 2
distro.
from robotrace.adapters import ros2
ros2.upload_bag(
"./run_2026-05-08/",
name="warmup pick-and-place",
policy_version="pap-v3.2.1",
env_version="halcyon-cell-rev4",
git_sha="abc1234",
)That's the whole 95% case. Read on for the three explicit verbs, topic auto-classification, multi-camera handling, and the install matrix.
Install
# Sensor- / action-only bags. No image topics.
pip install 'robotrace-dev[ros2]==0.1.0a11'
# With image-topic → MP4 encoding (most cases).
pip install 'robotrace-dev[ros2,video]==0.1.0a11'The pin is the most reliable install during alpha and drops once
we cut 1.0.
[ros2] pulls in rosbags and numpy. [video] adds
opencv-python for encoding sensor_msgs/Image (and
CompressedImage) topics into a single video.mp4. The split is on
purpose - a sensor-only bag shouldn't pay opencv's install cost. If
you call upload_bag on a bag that has image topics without [video]
installed, the adapter raises ConfigurationError pointing at the
right pip install line.
ROS 1 (rosbag1) is out of scope - RoboTrace is ROS 2 only.
The four verbs
| Verb | What it does |
|---|---|
ros2.scan_bag(path) | Read-only introspection. Returns a BagSummary with topic catalog, classifier decisions, and bag duration. No files written, no network. |
ros2.encode_bag(path, out) | Walks the bag once, writes video.mp4, sensors.npz, actions.npz into out. Returns an EncodedBag with the file paths and discovered duration_s / fps. No network. |
ros2.upload_bag(path, ...) | One-shot: scan → encode to a tempdir → start_episode + upload_* + finalize. Cleans the tempdir on return. Returns the finalized Episode. |
ros2.record(topics=[...]) | Live recording: subscribes to topics via rclpy during a run, writes a tempdir bag, then auto-encodes and uploads on close. Context manager. Requires a sourced ROS 2 distro (see Live recording). |
scan_bag is the dry-run - most users start there to confirm the
classifier picked the right topics before paying for the full encode.
summary = ros2.scan_bag("./run_2026-05-08/")
print(summary.report())
# /Users/.../run_2026-05-08 47.20s, 1842 messages
# video:
# /camera/image_raw (sensor_msgs/msg/Image, 472 msg, via msgtype)
# sensors:
# /joint_states (sensor_msgs/msg/JointState, 944 msg, via default)
# actions:
# /cmd_vel (geometry_msgs/msg/Twist, 426 msg, via msgtype)If everything looks right, swap scan_bag for upload_bag and you're
done.
Topic auto-classification
Every connection in the bag is sorted into one of three slots based on its message type and (as a fallback) its topic name. Rules apply in this order - first match wins:
| Rule | → Slot |
|---|---|
sensor_msgs/Image, sensor_msgs/CompressedImage | video |
geometry_msgs/Twist, TwistStamped, Wrench, WrenchStamped, trajectory_msgs/JointTrajectory, MultiDOFJointTrajectory, control_msgs/JointJog | actions |
Topic name ends in /cmd_* or /command (catches custom command messages) | actions |
| Anything else | sensors |
Override per-slot if the heuristic is wrong for your bag:
ros2.upload_bag(
"./run_2026-05-08/",
video_topics=["/cameras/wrist/image_color"],
sensor_topics=["/joint_states", "/wrench"],
action_topics=["/teleop/joy_cmd"],
policy_version="pap-v3.2.1",
)Pass an empty list (video_topics=[]) to deliberately exclude a
slot - useful for bags where you only care about sensor traces.
Multi-camera bags
When more than one image topic is present, the adapter tiles the
frames horizontally into a single video.mp4. Heights are
black-padded so cameras with different resolutions still align. Topic
order is alphabetical so the same bag always produces the same
mosaic.
If you only want one camera, pass canonical_video_topic:
ros2.upload_bag(
"./bimanual_run/",
canonical_video_topic="/cameras/overhead/image_color",
policy_version="pap-v3.2.1",
)The mosaic's frame rate is computed from the median inter-frame delta of whichever topic has the most frames; if it can't be computed (single-frame topic, all timestamps equal) the encoder falls back to 10 fps so the resulting MP4 is obviously a placeholder rather than silently wrong.
How sensors / actions get packed
Each non-image topic contributes a set of arrays into a single NPZ file per slot. Layout uses the topic name as a namespace so a single NPZ can hold many heterogeneous streams without clobbering keys:
sensors.npz
/joint_states/_t_ns int64[N] # nanosecond timestamps
/joint_states/position float32[N, 6]
/joint_states/velocity float32[N, 6]
/joint_states/effort float32[N, 6]
/imu/_t_ns int64[M]
/imu/orientation float32[M, 4]
/imu/angular_velocity float32[M, 3]
/imu/linear_acceleration float32[M, 3]Well-known message types get clean field names from a built-in flattener registry:
| Message type | Fields packed |
|---|---|
sensor_msgs/JointState | position, velocity, effort |
sensor_msgs/Imu | orientation, angular_velocity, linear_acceleration |
geometry_msgs/Twist[Stamped] | linear, angular |
geometry_msgs/Wrench[Stamped] | force, torque |
geometry_msgs/PoseStamped | position, orientation |
nav_msgs/Odometry | position, orientation, linear_velocity, angular_velocity |
Anything not on that list gets the generic flattener: it walks
the dataclass and packs every numeric scalar / fixed-length numeric
field. Strings, variable-length nested arrays, and opaque blobs
(PointCloud2, image-shaped buffers) are dropped silently. If a
field's shape changes mid-bag - e.g. a joint count flip - the
encoder records the topic in
metadata.skipped_topics so you can spot it in the portal instead of
shipping silently corrupt data.
Episode metadata
The adapter merges its own metadata with anything you pass:
ros2.upload_bag(
"./run/",
policy_version="pap-v3.2.1",
metadata={"task": "pick_and_place", "operator": "alice"},
)Lands on the episode as:
{
"adapter": "ros2",
"bag": "/Users/.../run",
"skipped_topics": [],
"task": "pick_and_place",
"operator": "alice"
}Your keys win on collision.
Encode-then-handle-it-yourself
encode_bag exposes the artifacts as files so you can inspect or
post-process before uploading. Useful when you want to splice frames,
re-bucket sensor topics, or stage a long-running upload:
encoded = ros2.encode_bag("./run/", "/tmp/encoded/")
print(encoded.duration_s, encoded.fps)
# 47.2 30.0
print([a.path for a in encoded.artifacts()])
# [PosixPath('/tmp/encoded/video.mp4'),
# PosixPath('/tmp/encoded/sensors.npz'),
# PosixPath('/tmp/encoded/actions.npz')]Then drive start_episode / upload_* directly - same plumbing
upload_bag uses internally. The adapter goes through the lower-level
start_episode path (not log_episode)
because log_episode rejects .npz files in the actions= slot
(its extension check guesses sensors); the adapter knows what it
wrote and bypasses that validation.
Live recording via rclpy
ros2.record(topics=[...]) subscribes to a set of topics during a
running ROS 2 stack and ships everything captured as one episode on
close. Same artifact contract as the offline path - the only
difference is which side wrote the bag.
from robotrace.adapters import ros2
with ros2.record(
topics=["/camera/image_raw", "/joint_states", "/cmd_vel"],
name="warmup pick-and-place",
policy_version="pap-v3.2.1",
env_version="halcyon-cell-rev4",
git_sha="abc1234",
) as rec:
drive_robot_for_30_seconds() # your existing code
# __exit__: stop subscriptions, close bag, encode → upload → finalize.
print(rec.episode.id)Requirements
ros2.record(...) needs rclpy, which ships with the ROS 2 distro
via apt - not via pip. Source your workspace first:
source /opt/ros/humble/setup.bash # or jazzy, iron, rolling
python my_script.pyThe SDK deliberately does not pin rclpy from PyPI because the
PyPI wheels are not always compatible with the rmw bindings shipped
with most distros. Calling ros2.record(...) without a sourced
workspace raises ConfigurationError pointing at the apt command;
the offline upload_bag(...) path continues to work without rclpy.
Topic validation
Topics are validated against the live ROS 2 graph at start() time.
A topic with no advertised publisher raises ConfigurationError so
the user notices a typo immediately instead of debugging an empty
bag 60 seconds later:
ConfigurationError: topic '/camera/image' has no advertised
publisher. Available topics: ['/camera/image_raw', '/joint_states',
'/cmd_vel']. Start your robot node before opening
`ros2.record(...)` so the type can be resolved.Start your robot node before opening the context manager.
Failure handling
If your code raises inside the with block, the recorder catches
the exception, finalizes the episode as failed with the failure
reason in metadata.failure_reason, then re-raises:
try:
with ros2.record(topics=["/joint_states"], name="run") as rec:
run_my_policy() # raises midway
except RuntimeError:
# The episode is recorded as `failed` in the portal with the
# full traceback in metadata. The bag we captured up to the
# failure point is uploaded so you can replay the run that led
# to the error.
...Empty bags (no messages received before close) are not uploaded - the recorder silently cleans up the tempdir. Useful for the "I opened the context manager but the robot never published" case.
Episode metadata on live runs
In addition to the standard adapter metadata, live recordings stamp:
{
"adapter": "ros2",
"ros2": {
"mode": "live",
"distro": "humble",
"topics": ["/camera/image_raw", "/joint_states", "/cmd_vel"],
"message_count": 1842
}
}so the portal can tell live recordings apart from bag-file uploads when triaging.
Errors
| Exception | When |
|---|---|
ConfigurationError | path doesn't exist, isn't a directory, has no metadata.yaml, or rosbags / cv2 aren't installed |
AuthError | API key bad / revoked (raised by the underlying start_episode) |
ValidationError | Server rejected the create payload |
TransportError | Network / DNS / timeout during the create or upload |
If an upload fails partway through, the adapter (via
Client.start_episode's standard handling) flips the run to
status="failed" with the failure reason in
metadata.failure_reason before re-raising - so you don't end up
with ghostly "recording" runs in the portal.