Perception

In this section, we introduce the key class and attributes functions in the perception modules.

PerceptionMethods Class

The PerceptionMethods class hosts all the perception methods used in the perception module. It is designed to handle multiple perception techniques, including YOLO ,SSD, and groundtruth.

Class Overview

class PerceptionMethods:
    def __init__(self, config_yaml, ml_model, vehicle, id, carla_world=None):
        self.carla_world = carla_world if carla_world is not None else vehicle.get_world()
        self.vehicle = vehicle
        self.id = id
        self.activate = config_yaml['activate']
        self.camera_visualize = config_yaml['camera']['visualize']
        self.camera_num = min(config_yaml['camera']['num'], 4)
        self.lidar_visualize = config_yaml['lidar']['visualize']
        self.global_position = config_yaml.get('global_position')
        self.object_detection_model = ml_model.object_detection_model
        self.transmission_latency = config_yaml.get('transmission_latency')
        self.transmission_latency_in_sec = config_yaml.get('transmission_latency_in_sec')
        self.yolo_detected_objects_queue = deque(maxlen=1000)
        self.server_detected_objects_queue = deque(maxlen=1000)
        ...

Key Attribute Function: yolo_detection

The yolo_detection method integrates YOLOv5 object detection with lidar data for robust perception. It detects objects in the scene and updates the object dictionary with their coordinates and confidence levels.

def yolo_detection(self, objects, ego_pos):
    # If no camera is created, stop detection.
    if len(self.rgb_camera) == 0:
        return {'vehicles': [], 'traffic_lights': []}

    # Retrieve current cameras and lidar data
    rgb_images = []
    for rgb_camera in self.rgb_camera:
        while rgb_camera.image is None:
            continue
        rgb_images.append(cv2.cvtColor(np.array(rgb_camera.image), cv2.COLOR_BGR2RGB))

    # YOLO detection
    yolo_detection = self.object_detection_model.object_detector_yolo(rgb_images)

    # Process lidar data and perform camera-lidar fusion
    rgb_draw_images = []
    for i, rgb_camera in enumerate(self.rgb_camera):
        rgb_image, projected_lidar = st.convert_lidar_to_camera(
            self.lidar.sensor, rgb_camera.sensor, self.lidar.data, np.array(rgb_camera.image)
        )
        rgb_draw_images.append(rgb_image)
        objects = camera_lidar_fusion_yolo(
            objects, yolo_detection.xyxy[i], self.lidar.data, projected_lidar, self.lidar.sensor
        )

    # Visualization (optional)
    if self.camera_visualize:
        for i, rgb_image in enumerate(rgb_draw_images):
            rgb_image = visualize_yolo_bbx(yolo_detection, rgb_image, i)
            cv2.imshow(f'Camera {i}', rgb_image)
        cv2.waitKey(1)

    # Add traffic light information
    objects = get_traffic_lights(objects, self.carla_world, ego_pos)
    return objects

The PerceptionMethods class also includes functions for: - SSD-based object detection: ssd_detection - Server-based object detection: server_detection

These methods provide flexible options for integrating different detection models and techniques in the simulation.

Perception Class

The Perception class serves as the central module for detecting objects. It integrates multiple perception methods and configurations, allowing for flexible and customizable object detection.

Note

Customized perception methods can be added by modifying the object_detect function.

class Perception:
    def __init__(self, vehicle, config_yaml, ml_model, carla_world=None, infra_id=None):
        self.carla_world = carla_world if carla_world is not None else vehicle.get_world()
        self.id = infra_id if infra_id is not None else vehicle.id
        self.activate = config_yaml['activate']
        self.activate_model = config_yaml['model']
        self.camera_visualize = config_yaml['camera']['visualize']
        self.lidar_visualize = config_yaml['lidar']['visualize']
        self.coop_perception = config_yaml.get('coop_perception', False)
        self.transmission_latency = config_yaml.get('transmission_latency')
        self.transmission_latency_in_sec = config_yaml.get('transmission_latency_in_sec')
        self.errors = config_yaml.get('errors')
        self.error_rate = config_yaml.get('error_rate')
        self.error_present = False
        self.global_position = config_yaml.get('global_position')
        self.PerceptionMethods = perception_methods.PerceptionMethods(config_yaml, ml_model, vehicle, self.id, carla_world)
        self.objects = {}

Key Attribute Function: object_detect

The object_detect method identifies surrounding objects using the selected perception method. It supports multiple detection models and handles errors in the detection process.

def object_detect(self, ego_pos):
    objects = {'vehicles': [], 'traffic_lights': []}
    if not self.activate:
        objects = self.PerceptionMethods.server_detection(objects, ego_pos)
    else:
        if self.activate_model == "yolo":
            objects = self.PerceptionMethods.yolo_detection(objects, ego_pos)
        elif self.activate_model == "ssd":
            objects = self.PerceptionMethods.ssd_detection(objects, ego_pos)
        # Add additional perception methods here
        # elif self.activate_model == "{method_name}":
        #     objects = self.PerceptionMethods.{method_name}_detection(objects, ego_pos)
    if self.errors:
        self.error_present = spu.get_error(self.error_rate)
        if self.error_present:
            objects = {'vehicles': [], 'traffic_lights': []}
    return objects

Customizing the Perception Module

To add a new perception method: 1. Implement the method in the PerceptionMethods class. 2. Add the method to the object_detect function using the format:

elif self.activate_model == "{method_name}":
    objects = self.PerceptionMethods.{method_name}_detection(objects, ego_pos)

CameraSensor Class

The CameraSensor class manages the RGB camera for vehicles or infrastructure. It provides methods to calculate spawn points of camera and process camera data.

class CameraSensor:
    def __init__(self, vehicle, world, relative_position, global_position):
        if vehicle:
            world = vehicle.get_world()
        if vehicle is None:
            is_rsu = True
        else:
            is_rsu = False
        blueprint = world.get_blueprint_library().find('sensor.camera.rgb')
        blueprint.set_attribute('fov', '100')
        camera_spawn_point = self.calculate_camera_spawn_point(is_rsu, relative_position, global_position)
        self.sensor = world.spawn_actor(blueprint, camera_spawn_point, attach_to=vehicle) if vehicle else world.spawn_actor(
            blueprint, camera_spawn_point)
        self.image = None
        self.timstamp = None
        self.frame = 0
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda event: CameraSensor.process_camera_data(weak_self, event))
        self.image_width = int(self.sensor.attributes['image_size_x'])
        self.image_height = int(self.sensor.attributes['image_size_y'])

calculate_camera_spawn_point

Calculates the spawn point for the camera based on global or relative position.

@staticmethod
def calculate_camera_spawn_point(is_rsu, relative_position, global_position):
      ...
    return spawn_point

process_camera_data

Processes incoming camera data and extracts RGB frames.

@staticmethod
def process_camera_data(weak_self, event):
    self = weak_self()
    if not self:
        return
    image = np.array(event.raw_data)
    image = image.reshape((self.image_height, self.image_width, 4))
    image = image[:, :, :3]  # Remove the alpha channel
    self.image = image
    self.frame = event.frame
    self.timestamp = event.timestamp

LidarSensor Class

The LidarSensor class manages lidar data collection and processes point cloud data for vehicles or infrastructure.

class LidarSensor:
    def __init__(self, vehicle, world, config_yaml, global_position):
        if vehicle:
            world = vehicle.get_world()
        blueprint = world.get_blueprint_library().find('sensor.lidar.ray_cast')
        self.update_blueprint_attributes(blueprint, config_yaml)
        spawn_point = self.get_spawn_point(global_position)
        self.sensor = world.spawn_actor(blueprint, spawn_point, attach_to=vehicle) if vehicle else world.spawn_actor(
            blueprint, spawn_point)
        self.data = None
        self.timestamp = None
        self.frame = 0
        self.o3d_pointcloud = o3d.geometry.PointCloud()
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda event: LidarSensor.process_lidar_data(weak_self, event))

get_spawn_point

Calculates the spawn point for the lidar sensor.

def get_spawn_point(self, global_position):
    if global_position is None:
        return carla.Transform(carla.Location(x=-0.5, z=1.9))
    return carla.Transform(carla.Location(x=global_position[0],
                                          y=global_position[1],
                                          z=global_position[2]))

process_lidar_data

Processes lidar data to generate point clouds.

@staticmethod
def process_lidar_data(weak_self, event):
    self = weak_self()
    if not self:
        return
    data = np.copy(np.frombuffer(event.raw_data, dtype=np.dtype('f4')))
    data = np.reshape(data, (int(data.shape[0] / 4), 4))  # (x, y, z, intensity)
    self.data = data
    self.frame = event.frame
    self.timestamp = event.timestamp

SemanticLidarSensor Class

The SemanticLidarSensor class manages semantic lidar data collection, used primarily for data dumping.

class SemanticLidarSensor:
    def __init__(self, vehicle, world, config_yaml, global_position):
        if vehicle:
            world = vehicle.get_world()
        blueprint = world.get_blueprint_library().find('sensor.lidar.ray_cast_semantic')
        self.update_blueprint_attributes(blueprint, config_yaml)
        spawn_point = self.get_spawn_point(global_position)
        self.sensor = world.spawn_actor(blueprint, spawn_point, attach_to=vehicle) if vehicle else world.spawn_actor(
            blueprint, spawn_point)
        self.points = None
        self.obj_idx = None
        self.obj_tag = None
        self.timestamp = None
        self.frame = 0
        self.o3d_pointcloud = o3d.geometry.PointCloud()
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda event: SemanticLidarSensor.process_lidar_data(weak_self, event))

get_spawn_point

Calculates the spawn point for the semantic lidar sensor.

def get_spawn_point(self, global_position):
    if global_position is None:
        return carla.Transform(carla.Location(x=-0.5, z=1.9))
    return carla.Transform(carla.Location(x=global_position[0], y=global_position[1], z=global_position[2]))

process_lidar_data

Processes semantic lidar data, extracting points, object indices, and tags.

@staticmethod
def process_lidar_data(weak_self, event):
    self = weak_self()
    if not self:
        return
    data = np.frombuffer(event.raw_data, dtype=np.dtype([
        ('x', np.float32), ('y', np.float32), ('z', np.float32),
        ('CosAngle', np.float32), ('ObjIdx', np.uint32),
        ('ObjTag', np.uint32)]))
    self.points = np.array([data['x'], data['y'], data['z']]).T
    self.obj_tag = np.array(data['ObjTag'])
    self.obj_idx = np.array(data['ObjIdx'])
    self.frame = event.frame
    self.timestamp = event.timestamp

BoundingBox Class

The BoundingBox class represents a bounding box for obstacles or vehicles. It calculates the central location and dimensions of an object based on its corners.

class BoundingBox:
    def __init__(self, corners):
        self.loc = self.compute_location(corners)
        self.size = self.compute_extent(corners)

compute_location

Calculates the central location of the bounding box.

@staticmethod
def compute_location(corners):
    avg_x, avg_y, avg_z = np.mean(corners, axis=0)
    return carla.Location(x=avg_x, y=avg_y, z=avg_z)

compute_extent

Calculates the dimensions of the bounding box (width, height, and depth).

@staticmethod
def compute_extent(corners):
    half_x = (np.max(corners[:, 0]) - np.min(corners[:, 0])) / 2
    half_y = (np.max(corners[:, 1]) - np.min(corners[:, 1])) / 2
    half_z = (np.max(corners[:, 2]) - np.min(corners[:, 2])) / 2
    return carla.Vector3D(x=half_x, y=half_y, z=half_z)

StaticObstacle Class

The StaticObstacle class represents static obstacles such as stop signs or traffic lights. It encapsulates bounding box details for the obstacle.

class StaticObstacle:
    def __init__(self, bounding_corners, o3d_box):
        self.bbx = BoundingBox(bounding_corners)
        self.o3d_box = o3d_box

TrafficLight Class

The TrafficLight class maps traffic light information retrieved from the server and encapsulates its location and state.

class TrafficLight:
    def __init__(self, position, state):
        self.position = position
        self.state = state

get_state

Returns the current state of the traffic light.

def get_state(self):
    return self.state