ROS 2 adapter

Reads rosbag2 directories (sqlite3 + mcap backends) and turns them into RoboTrace episodes. No rclpy install required - the adapter is backed by the pure-Python rosbags library, so it runs on macOS dev boxes and in CI without a ROS 2 distro.

from robotrace.adapters import ros2
 
ros2.upload_bag(
    "./run_2026-05-08/",
    name="warmup pick-and-place",
    policy_version="pap-v3.2.1",
    env_version="halcyon-cell-rev4",
    git_sha="abc1234",
)

That's the whole 95% case. Read on for the three explicit verbs, topic auto-classification, multi-camera handling, and the install matrix.

Install

# Sensor- / action-only bags. No image topics.
pip install 'robotrace-dev[ros2]==0.1.0a11'
 
# With image-topic → MP4 encoding (most cases).
pip install 'robotrace-dev[ros2,video]==0.1.0a11'

The pin is the most reliable install during alpha and drops once we cut 1.0.

[ros2] pulls in rosbags and numpy. [video] adds opencv-python for encoding sensor_msgs/Image (and CompressedImage) topics into a single video.mp4. The split is on purpose - a sensor-only bag shouldn't pay opencv's install cost. If you call upload_bag on a bag that has image topics without [video] installed, the adapter raises ConfigurationError pointing at the right pip install line.

ROS 1 (rosbag1) is out of scope - RoboTrace is ROS 2 only.

The four verbs

VerbWhat it does
ros2.scan_bag(path)Read-only introspection. Returns a BagSummary with topic catalog, classifier decisions, and bag duration. No files written, no network.
ros2.encode_bag(path, out)Walks the bag once, writes video.mp4, sensors.npz, actions.npz into out. Returns an EncodedBag with the file paths and discovered duration_s / fps. No network.
ros2.upload_bag(path, ...)One-shot: scan → encode to a tempdir → start_episode + upload_* + finalize. Cleans the tempdir on return. Returns the finalized Episode.
ros2.record(topics=[...])Live recording: subscribes to topics via rclpy during a run, writes a tempdir bag, then auto-encodes and uploads on close. Context manager. Requires a sourced ROS 2 distro (see Live recording).

scan_bag is the dry-run - most users start there to confirm the classifier picked the right topics before paying for the full encode.

summary = ros2.scan_bag("./run_2026-05-08/")
print(summary.report())
# /Users/.../run_2026-05-08  47.20s, 1842 messages
#   video:
#     /camera/image_raw  (sensor_msgs/msg/Image, 472 msg, via msgtype)
#   sensors:
#     /joint_states  (sensor_msgs/msg/JointState, 944 msg, via default)
#   actions:
#     /cmd_vel  (geometry_msgs/msg/Twist, 426 msg, via msgtype)

If everything looks right, swap scan_bag for upload_bag and you're done.

Topic auto-classification

Every connection in the bag is sorted into one of three slots based on its message type and (as a fallback) its topic name. Rules apply in this order - first match wins:

Rule→ Slot
sensor_msgs/Image, sensor_msgs/CompressedImagevideo
geometry_msgs/Twist, TwistStamped, Wrench, WrenchStamped, trajectory_msgs/JointTrajectory, MultiDOFJointTrajectory, control_msgs/JointJogactions
Topic name ends in /cmd_* or /command (catches custom command messages)actions
Anything elsesensors

Override per-slot if the heuristic is wrong for your bag:

ros2.upload_bag(
    "./run_2026-05-08/",
    video_topics=["/cameras/wrist/image_color"],
    sensor_topics=["/joint_states", "/wrench"],
    action_topics=["/teleop/joy_cmd"],
    policy_version="pap-v3.2.1",
)

Pass an empty list (video_topics=[]) to deliberately exclude a slot - useful for bags where you only care about sensor traces.

Multi-camera bags

When more than one image topic is present, the adapter tiles the frames horizontally into a single video.mp4. Heights are black-padded so cameras with different resolutions still align. Topic order is alphabetical so the same bag always produces the same mosaic.

If you only want one camera, pass canonical_video_topic:

ros2.upload_bag(
    "./bimanual_run/",
    canonical_video_topic="/cameras/overhead/image_color",
    policy_version="pap-v3.2.1",
)

The mosaic's frame rate is computed from the median inter-frame delta of whichever topic has the most frames; if it can't be computed (single-frame topic, all timestamps equal) the encoder falls back to 10 fps so the resulting MP4 is obviously a placeholder rather than silently wrong.

How sensors / actions get packed

Each non-image topic contributes a set of arrays into a single NPZ file per slot. Layout uses the topic name as a namespace so a single NPZ can hold many heterogeneous streams without clobbering keys:

sensors.npz
  /joint_states/_t_ns      int64[N]            # nanosecond timestamps
  /joint_states/position   float32[N, 6]
  /joint_states/velocity   float32[N, 6]
  /joint_states/effort     float32[N, 6]
  /imu/_t_ns               int64[M]
  /imu/orientation         float32[M, 4]
  /imu/angular_velocity    float32[M, 3]
  /imu/linear_acceleration float32[M, 3]

Well-known message types get clean field names from a built-in flattener registry:

Message typeFields packed
sensor_msgs/JointStateposition, velocity, effort
sensor_msgs/Imuorientation, angular_velocity, linear_acceleration
geometry_msgs/Twist[Stamped]linear, angular
geometry_msgs/Wrench[Stamped]force, torque
geometry_msgs/PoseStampedposition, orientation
nav_msgs/Odometryposition, orientation, linear_velocity, angular_velocity

Anything not on that list gets the generic flattener: it walks the dataclass and packs every numeric scalar / fixed-length numeric field. Strings, variable-length nested arrays, and opaque blobs (PointCloud2, image-shaped buffers) are dropped silently. If a field's shape changes mid-bag - e.g. a joint count flip - the encoder records the topic in metadata.skipped_topics so you can spot it in the portal instead of shipping silently corrupt data.

Episode metadata

The adapter merges its own metadata with anything you pass:

ros2.upload_bag(
    "./run/",
    policy_version="pap-v3.2.1",
    metadata={"task": "pick_and_place", "operator": "alice"},
)

Lands on the episode as:

{
  "adapter": "ros2",
  "bag": "/Users/.../run",
  "skipped_topics": [],
  "task": "pick_and_place",
  "operator": "alice"
}

Your keys win on collision.

Encode-then-handle-it-yourself

encode_bag exposes the artifacts as files so you can inspect or post-process before uploading. Useful when you want to splice frames, re-bucket sensor topics, or stage a long-running upload:

encoded = ros2.encode_bag("./run/", "/tmp/encoded/")
 
print(encoded.duration_s, encoded.fps)
# 47.2 30.0
print([a.path for a in encoded.artifacts()])
# [PosixPath('/tmp/encoded/video.mp4'),
#  PosixPath('/tmp/encoded/sensors.npz'),
#  PosixPath('/tmp/encoded/actions.npz')]

Then drive start_episode / upload_* directly - same plumbing upload_bag uses internally. The adapter goes through the lower-level start_episode path (not log_episode) because log_episode rejects .npz files in the actions= slot (its extension check guesses sensors); the adapter knows what it wrote and bypasses that validation.

Live recording via rclpy

ros2.record(topics=[...]) subscribes to a set of topics during a running ROS 2 stack and ships everything captured as one episode on close. Same artifact contract as the offline path - the only difference is which side wrote the bag.

from robotrace.adapters import ros2
 
with ros2.record(
    topics=["/camera/image_raw", "/joint_states", "/cmd_vel"],
    name="warmup pick-and-place",
    policy_version="pap-v3.2.1",
    env_version="halcyon-cell-rev4",
    git_sha="abc1234",
) as rec:
    drive_robot_for_30_seconds()    # your existing code
# __exit__: stop subscriptions, close bag, encode → upload → finalize.
print(rec.episode.id)

Requirements

ros2.record(...) needs rclpy, which ships with the ROS 2 distro via apt - not via pip. Source your workspace first:

source /opt/ros/humble/setup.bash    # or jazzy, iron, rolling
python my_script.py

The SDK deliberately does not pin rclpy from PyPI because the PyPI wheels are not always compatible with the rmw bindings shipped with most distros. Calling ros2.record(...) without a sourced workspace raises ConfigurationError pointing at the apt command; the offline upload_bag(...) path continues to work without rclpy.

Topic validation

Topics are validated against the live ROS 2 graph at start() time. A topic with no advertised publisher raises ConfigurationError so the user notices a typo immediately instead of debugging an empty bag 60 seconds later:

ConfigurationError: topic '/camera/image' has no advertised
publisher. Available topics: ['/camera/image_raw', '/joint_states',
'/cmd_vel']. Start your robot node before opening
`ros2.record(...)` so the type can be resolved.

Start your robot node before opening the context manager.

Failure handling

If your code raises inside the with block, the recorder catches the exception, finalizes the episode as failed with the failure reason in metadata.failure_reason, then re-raises:

try:
    with ros2.record(topics=["/joint_states"], name="run") as rec:
        run_my_policy()    # raises midway
except RuntimeError:
    # The episode is recorded as `failed` in the portal with the
    # full traceback in metadata. The bag we captured up to the
    # failure point is uploaded so you can replay the run that led
    # to the error.
    ...

Empty bags (no messages received before close) are not uploaded - the recorder silently cleans up the tempdir. Useful for the "I opened the context manager but the robot never published" case.

Episode metadata on live runs

In addition to the standard adapter metadata, live recordings stamp:

{
  "adapter": "ros2",
  "ros2": {
    "mode": "live",
    "distro": "humble",
    "topics": ["/camera/image_raw", "/joint_states", "/cmd_vel"],
    "message_count": 1842
  }
}

so the portal can tell live recordings apart from bag-file uploads when triaging.

Errors

ExceptionWhen
ConfigurationErrorpath doesn't exist, isn't a directory, has no metadata.yaml, or rosbags / cv2 aren't installed
AuthErrorAPI key bad / revoked (raised by the underlying start_episode)
ValidationErrorServer rejected the create payload
TransportErrorNetwork / DNS / timeout during the create or upload

If an upload fails partway through, the adapter (via Client.start_episode's standard handling) flips the run to status="failed" with the failure reason in metadata.failure_reason before re-raising - so you don't end up with ghostly "recording" runs in the portal.