Skip to content

Commit

Permalink
revise tf to tf2_ros in RVizVisualizer transform broadcast in rviz_to…
Browse files Browse the repository at this point in the history
…ols and add functions to pose_tools.py, fix type declaration np.ndarray in rviz_tools.py
  • Loading branch information
andrewluiqut committed Jun 12, 2024
1 parent 2446df7 commit e2d9cd3
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 13 deletions.
26 changes: 25 additions & 1 deletion rviz_marker/pose_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import math
import rospy, tf
import tf.transformations
from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion
from geometry_msgs.msg import Pose, PoseStamped, Point, Quaternion, TransformStamped, Transform, Vector3, PointStamped

def list_to_pose(pose) -> Pose:
""" Convert a pose in xyzrpy or xyzq format to Pose
Expand Down Expand Up @@ -91,6 +91,30 @@ def list_to_xyzrpy(pose) -> list:
else:
return pose[:3] + list(tf.transformations.euler_from_quaternion(pose[3:]))

def transform_to_pose_stamped(transform:TransformStamped) -> PoseStamped:
""" Convert a TransformStamped object into PoseStamped object
"""
point = Point(transform.transform.translation.x, transform.transform.translation.y, transform.transform.translation.z)
quaternion = Quaternion(transform.transform.rotation.x, transform.transform.rotation.y, transform.transform.rotation.z, transform.transform.rotation.w)
return PoseStamped(transform.header, Pose(point, quaternion))

def pose_stamped_to_transform_stamped(pose_stamped:PoseStamped, reference_frame:str) -> TransformStamped:
""" Convert a PoseStamped object into TransformStamped object
"""
translation = Vector3(pose_stamped.pose.position.x, pose_stamped.pose.position.y, pose_stamped.pose.position.z)
transform = Transform(translation, pose_stamped.pose.orientation)
transform_stamped = TransformStamped(pose_stamped.header, reference_frame, transform)
return transform_stamped

def pose_stamped_to_point_stamped(pose_stamped:PoseStamped) -> PointStamped:
""" Convert a PoseStamped object into PointStamped object
"""
point_stamped = PointStamped(pose_stamped.header, pose_stamped.pose.position)
return point_stamped

def same_rpy_with_tolerence(rpy_1:list, rpy_2:list, tolerance:float=0.01) -> bool:
""" Test if two euler's angles are the same
Expand Down
53 changes: 41 additions & 12 deletions rviz_marker/rviz_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from collections import defaultdict
import cv2
import numpy as np
import rospy, tf
import rospy, tf, tf2_ros
from std_msgs.msg import Header
from tf2_msgs.msg import TFMessage
from std_msgs.msg import ColorRGBA, Header
from geometry_msgs.msg import Pose, PoseStamped, Twist, TwistStamped, Vector3, Point, Quaternion
Expand All @@ -30,6 +31,7 @@
from rviz_marker.pose_tools import list_to_pose, pose_to_xyzq
from rviz_marker.rospkg_tools import PackageFile
from rviz_marker.logging_tools import logger
import rviz_marker.pose_tools as pose_tools

class RGBAColors(int, Enum):
""" Define common use colours for visualization
Expand Down Expand Up @@ -361,15 +363,15 @@ def create_mesh_marker(name:str, id:int, file_uri:str, xyzrpy:list, reference_fr
the_marker.mesh_use_embedded_materials = True
return the_marker

def create_pointcloud_from_image(image_bgr, xyz:list=(0, 0, 0), pixel_physical_size:float=0.005, reference_frame=None, opacity=255, depth_array=None) -> PointCloud2:
def create_pointcloud_from_image(image_bgr:np.ndarray, xyz:list=(0, 0, 0), pixel_physical_size:float=0.005, reference_frame=None, opacity=255, depth_array:np.ndarray=None) -> PointCloud2:
""" Create a PointCloud2 for displaying a OpenCV image (color or greyscale)
:param image_bgr: the image to be displayed, type numpy array
:param image_bgr: the image to be displayed, type numpy ndarray
:param xyz: the position of the bottom left hand corner of the image, defaults to (0, 0, 0)
:param pixel_physical_size: the length of each pixel in x, y, defaults to [0.005, 0.005], and optionally the third value in the list for z scaling factor
:param reference_frame: the reference frame, defaults to None
:param opacity: the opacity of the displayed image, defaults to 255
:param depth_array: optionally a numpy array of exact the same shape as the image indicating the depth, defaults to None
:param depth_array: optionally a numpy ndarray of exact the same shape as the image indicating the depth, defaults to None
:return: the PointCloud2 object
"""
if image_bgr is None:
Expand Down Expand Up @@ -459,8 +461,7 @@ def __init__(self, **config_dict):
self.temp_marker_list = []
self.pointclouds_dict = defaultdict(lambda: None) # pointcloud_name -> dict
# setup tf publish
self.tf_pub = tf.TransformBroadcaster()
self.tf_listener = tf.TransformListener()
self.tf_broadcaster = tf2_ros.TransformBroadcaster()
self.tfs_dict = defaultdict(lambda: None) # frame_name -> dict
# setup marker publisher
self.marker_pub = rospy.Publisher(topic_marker, Marker, queue_size = 100)
Expand Down Expand Up @@ -510,8 +511,36 @@ def _cb_timer_tf(self, event):
with self.lock:
for custom_tf in self.tfs_dict.values():
name, parent_frame, pose = custom_tf['frame'], custom_tf['parent_frame'], custom_tf['pose']
xyzq = pose_to_xyzq(pose)
self.tf_pub.sendTransform(xyzq[:3], xyzq[3:], rospy.Time.now(), name, parent_frame)
# xyzq = pose_to_xyzq(pose)
# self.tf_pub.sendTransform(xyzq[:3], xyzq[3:], rospy.Time.now(), name, parent_frame)
self._pub_transform(name, pose, parent_frame)

# internal function: publish the transform of a specific named object
def _pub_transform(self, name:str, pose, frame=None):
""" publish the transform of an object
:param name: name of the object
:type name: str
:param pose: the pose of the object
:type pose: Pose, PoseStamped, list of 6 or 7
:param frame: the frame against which the pose is defined, ignored if PoseStamped is provided, defaults to None
:type frame: str, optional
"""
frame = self.base_frame if frame is None else frame
if type(pose) in [list, tuple]:
pose_stamped = pose_tools.list_to_pose_stamped(pose, frame)
elif type(pose) == Pose:
pose_stamped = PoseStamped()
pose_stamped.header.frame_id = frame
pose_stamped.header.stamp = rospy.Time.now()
pose_stamped.pose = pose
elif type(pose) == PoseStamped:
frame = pose.header.frame_id
pose_stamped = pose
else:
rospy.logerr(f'{__class__.__name__}: parameter (pose) is not list of length 6 or 7 or a Pose object -> fix the parameter at behaviour construction')
raise TypeError(f'A parameter is invalid')
self.tf_broadcaster.sendTransform(pose_tools.pose_stamped_to_transform_stamped(pose_stamped, name))

def add_persistent_marker(self, marker:Marker, pub_period:float=None, pub_tf:bool=False) -> Marker:
""" Add a persistent marker
Expand Down Expand Up @@ -554,7 +583,7 @@ def add_custom_tf(self, name:str, parent_frame:str, pose:Pose) -> None:
raise AssertionError(f'RvizVisualizer (add_custom_tf): No parameter can be None')
self.tfs_dict[name] = {'pose': pose, 'frame':name, 'parent_frame': parent_frame}

def delete_all_persistent_markers(self, frame) -> None:
def delete_all_persistent_markers(self, frame:str) -> None:
""" Remove all persistent markers from RViz and this object
:param frame: The reference frame
Expand All @@ -571,7 +600,7 @@ def delete_all_persistent_markers(self, frame) -> None:
self.markers_dict.clear()
self.temp_marker_list.append({'marker': create_delete_all_marker(frame), 'pub_time': time.time() + self.TEMP_MAKRER_PUB_DELAY})

def delete_persistent_marker(self, name, id, frame) -> None:
def delete_persistent_marker(self, name:str, id, frame:str) -> None:
""" Remove a persistent marker from RViz and this object
:param name: the name space of the marker
Expand All @@ -589,7 +618,7 @@ def delete_persistent_marker(self, name, id, frame) -> None:
if tf_frame in self.tfs_dict:
del self.tfs_dict[tf_frame]

def add_pointcloud(self, name, pointcloud:PointCloud2, pub_period:float=None) -> PointCloud2:
def add_pointcloud(self, name:str, pointcloud:PointCloud2, pub_period:float=None) -> PointCloud2:
""" Add a PointCloud2 object for regular publishing
:param name: the name of the pointcloud of type str
Expand All @@ -603,7 +632,7 @@ def add_pointcloud(self, name, pointcloud:PointCloud2, pub_period:float=None) ->
self.pointclouds_dict[name] = {'pointcloud': pointcloud, 'pub_period': pub_period, 'next_time': None}
return pointcloud

def delete_pointcloud(self, name) -> PointCloud2:
def delete_pointcloud(self, name:str) -> PointCloud2:
""" Remove a pointcloud from regular publishing
:param name: the name of the pointcloud of type str
Expand Down

0 comments on commit e2d9cd3

Please sign in to comment.