<
Particle Filter Implemented in Python
>

没有上一篇咯
下一篇

Kalman Filter Implemented in Python

Particle Filters implemented in Object tracking

What’s Paricle filters?

Particle filter is a powerful technique used for state estimation, particularly effective in handling nonlinear systems, non-Gaussian distributions, and complex noise scenarios. It utilizes a large number of randomly sampled particles to represent the distribution of possible system states and dynamically updates their weights based on sensor measurements. Unlike traditional filters such as the Kalman filter, particle filter is not constrained by assumptions of linearity and Gaussianity, making it ideal for scenarios with multimodal distributions, unknown dynamic models, or nonlinear sensor measurements.

For example, in robot localization, particle filter excels in accurately tracking the robot’s position and orientation amidst challenging environments and unpredictable motion patterns. This versatility makes particle filter a crucial tool in tasks like robot navigation, target tracking, and other dynamic system state estimations.

In this example, we will apply it into the tracking of dog’s postion. The original video:

The performance:

The video features a running cat being tracked using a particle filter. The red box represents the position predicted by the particle with the highest weight, indicating the most likely position of the dog. The green box represents the weighted average position of all particles, providing an overall estimate by combining the predictions of all particles. This dual-box system helps in accurately tracking the cat’s movement amidst challenging and dynamic environments.

Steps of particle filter

The core idea behind particle filtering is to use a large number of random samples (particles) to represent the distribution of possible states in a system, allowing for estimation and prediction of the system’s state. To apply this algorithm, there are few steps to follow:

  1. Initialization

    Randomly generate a set of particles and assign equal weights to them. These particles represent possible initial states of the system.

  2. Prediction

    For each particle, use the system dynamics model and control inputs to predict the next state. This involves moving the particles according to the expected system behavior.

  3. Update (Weighting)

    Update the weight of each particle based on the likelihood of the observed measurements given the particle’s predicted state. Particles that better match the actual observations receive higher weights.

  4. Resample

    Resample the particles based on their weights to focus on more probable states. Particles with higher weights are more likely to be duplicated, while particles with lower weights are likely to be discarded.

  5. Compute Estimation

    Optionally, compute the state estimate from the weighted particles, typically using the weighted mean or mode of the particles.

  6. Loop

    Repeat the prediction, update, resampling, and estimation steps for each new observation.

Code Implementation

import numpy as np
import cv2

1. State Prediction

The state of each particle is predicted based on a state transition model, which is often a linear model with added Gaussian noise.

num_of_particles = 500
s_init = np.array([1600, 700, 0, 0])  # Initial state [x_center, y_center, x_velocity, y_velocity]

# For the first frame, all particles are initialized to the same state
s_new = np.tile(s_init, (num_of_particles, 1)).T
weights = np.ones(num_of_particles)

Where:

2. Predict Particle States

Particles are predicted based on a state transition model. This model is linear and includes Gaussian noise to simulate process uncertainty.

\[x_t^{(i)} = A \cdot x_{t-1}^{(i)} + v_t^{(i)}\]

Where:

def predict_particles(particles_states):
    dynamics = np.array(
        [[1, 0, 1, 0],
         [0, 1, 0, 1],
         [0, 0, 1, 0],
         [0, 0, 0, 1]]
    )
    predicted_states = dynamics @ particles_states + np.random.normal(loc=0, scale=1, size=particles_states.shape)
    return predicted_states

Where:

3. Compute Particle Weights

Weights are computed based on the likelihood of the observed data given the particle’s state. This is done by comparing the histogram of the particle’s state with the reference histogram.

Weight calculation formula:

\[w_t^{(i)} \propto p(y_t | x_t^{(i)})\]

Where:

\(p(y_t | x_t^{(i)})\) is the likelihood of the observation given the particle’s state.

def compute_weight(p, q):
    if not np.isclose(np.sum(p), 1):
        raise ValueError('p histogram is not normalized')
    if not np.isclose(np.sum(q), 1):
        raise ValueError('q histogram is not normalized')
    bc = np.sum(np.sqrt(p * q))  # Bhattacharyya coefficient
    return np.exp(20 * bc)

This part computes the histogram and normalize the histogram.

4. Compute Normalized Histogram

Extract the region of interest (ROI) from the image based on the particle’s state and compute its color histogram. Normalize the histogram for comparison.

def compute_norm_hist(image, state):
    x_min = max(np.round(state[0] - half_width).astype(int), 0)
    x_max = min(np.round(state[0] + half_width).astype(int), image.shape[1] - 1)
    y_min = max(np.round(state[1] - half_height).astype(int), 0)
    y_max = min(np.round(state[1] + half_height).astype(int), image.shape[0] - 1)

    roi = image[y_min:y_max+1, x_min:x_max+1]
    roi_reduced = roi // 16
    roi_reduced = roi_reduced.astype(int)
    roi_indexing = (roi_reduced[..., 0] + roi_reduced[..., 1] * 16 + roi_reduced[..., 2] * 16 ** 2).flatten()
    hist, _ = np.histogram(roi_indexing, bins=4096, range=(0, 4096))
    norm_hist = hist / np.sum(hist)
    return norm_hist

Where:

5. Resample Particles

Resampling is done based on particle weights. This step ensures that particles with higher weights are more likely to be selected. Resampling steps:

def sample_particle(particles_states, weights):
    normalized_weights = weights / np.sum(weights)
    sampling_weights = np.cumsum(normalized_weights)
    rand_numbers = np.random.random(sampling_weights.size)
    cross_diff = sampling_weights[None, :] - rand_numbers[:, None]
    cross_diff[cross_diff < 0] = np.inf
    sampling = np.argmin(cross_diff, axis=1)
    sampled_particles = particles_states[:, sampling]
    return sampled_particles

Where:

6. Draw Bounding Box

Draw bounding boxes around the object based on the weighted average position and the position of the particle with the maximum weight.

def draw_bounding_box(image, states, weights, with_max=True):
    mean_box = np.average(states, axis=1, weights=weights)
    x_c_mean, y_c_mean = np.round(mean_box[:2]).astype(int)
    image_with_boxes = image.copy()
    cv2.rectangle(image_with_boxes, (x_c_mean - half_width, y_c_mean - half_height),
                                     (x_c_mean + half_width, y_c_mean + half_height), (0, 255, 0), 1)
    if with_max:
        max_box = states[:, np.argmax(weights)]
        x_c_max, y_c_max = np.round(max_box[:2]).astype(int)
        cv2.rectangle(image_with_boxes, (x_c_max - half_width, y_c_max - half_height),
                                         (x_c_max + half_width, y_c_max + half_height), (0, 0, 255), 1)
    return image_with_boxes

Where:

7.Draw Particles

Visualize particles on the image. The size of each particle is proportional to its weight.

def draw_particles(image, states, weights):
    image_with_particles = image.copy()
    for s, w in zip(states.T, weights):
        x, y = np.round(s[:2]).astype(int)
        cv2.circle(image_with_particles, (x, y), int(round(30 * w)), (0, 0, 255), thickness=-1)
    return image_with_particles

Use cv2.circle to draw each particle, with the size proportional to the particle’s weight.

8. Main Loop

Process each video frame iteratively, performing prediction, update, resampling, and visualization.

Create Video Reader Object

cap = cv2.VideoCapture('DSCF2822.MP4')
if not cap.isOpened():
    print("Error: Could not open video file.")
    exit(1)

Read the First Frame

ret, image = cap.read()
if not ret:
    print("Error: Could not read the first frame.")
    exit(1)

Initialization of Number of Particles and Initial State

num_of_particles = 500
s_init = np.array([1600, 700, 0, 0])  # initial state [x_center, y_center, x_velocity, y_velocity]

Bounding Box Dimensions: These values define the dimensions of the bounding box around the object being tracked.

half_height = 100
half_width = 80

Video Parameters

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = 20  # cap.get(5)  # frames per second

Initialize the Video Writer

size = (frame_width, frame_height)
result = cv2.VideoWriter('nini_tracked.avi', cv2.VideoWriter_fourcc(*'MJPG'), fps, size)

if not result.isOpened():
    print("Error: Could not open video writer.")
    exit(1)

Get the Normalized Histogram of the Initial State

q = compute_norm_hist(image, s_init)

compute_norm_hist computes the histogram for the initial state s_init of the object in the first frame.

This histogram serves as the reference for comparing with histograms of particles in subsequent frames.

Initialize Particles and Weights

s_new = np.tile(s_init, (num_of_particles, 1)).T
weights = np.ones(num_of_particles)

Main Loop

Processing frames:

# go over the frames in the video
frame_count = 0
while True:
    # sample new particles according to the weights
    s_sampled = sample_particle(s_new, weights)
    # predict new particles (states) according to the previous states
    s_new = predict_particles(s_sampled)
    # go over the new predicted states, and compute the histogram for the state and
    # the weight with the original histogram
    for jj, s in enumerate(s_new.T):
        p = compute_norm_hist(image, s)
        weights[jj] = compute_weight(p, q)

    # draw bounding box over the tracked object
    image_with_boxes = draw_bounding_box(image, s_new, weights, with_max=True)
    result.write(image_with_boxes)

    # read next frame
    ret, image = cap.read()
    if not ret:  # if there is no next frame to read, stop
        print(f"End of video at frame {frame_count}.")
        break
    frame_count += 1
    if frame_count % 10 == 0:
        print(f"Processed frame {frame_count}.")

# release video objects
cap.release()
result.release()
print("Video processing completed and file saved.")

-->
Top
Foot