Skip to content

Ensemble Kalman Filter

EnsembleKalmanFilter

Introduction and Overview

This implements the Ensemble Kalman filter.

EnsembleKalmanFilter

Bases: object

This implements the ensemble Kalman filter (EnKF). The EnKF uses an ensemble of hundreds to thousands of state vectors that are randomly sampled around the estimate, and adds perturbations at each update and predict step. It is useful for extremely large systems such as found in hydrophysics. As such, this class is admittedly a toy as it is far too slow with large N.

There are many versions of this sort of this filter. This formulation is due to Crassidis and Junkins [1]. It works with both linear and nonlinear systems.

Parameters:

Name Type Description Default
x array(dim_x)

state mean

required
P array((dim_x, dim_x))

covariance of the state

required
dim_z int

Number of of measurement inputs. For example, if the sensor provides you with position in (x,y), dim_z would be 2.

required
dt float

time step in seconds

required
N int

number of sigma points (ensembles). Must be greater than 1.

required
hx function hx(x)

Measurement function. May be linear or nonlinear - converts state x into a measurement. Return must be an np.array of the same dimensionality as the measurement vector.

required
fx function fx(x, dt)

State transition function. May be linear or nonlinear. Projects state x into the next time period. Returns the projected state x.

required

Attributes:

Name Type Description
x array(dim_x, 1)

State estimate

P array(dim_x, dim_x)

State covariance matrix

x_prior array(dim_x, 1)

Prior (predicted) state estimate. The _prior and _post attributes are for convienence; they store the prior and posterior of the current epoch. Read Only.

P_prior array(dim_x, dim_x)

Prior (predicted) state covariance matrix. Read Only.

x_post array(dim_x, 1)

Posterior (updated) state estimate. Read Only.

P_post array(dim_x, dim_x)

Posterior (updated) state covariance matrix. Read Only.

z array

Last measurement used in update(). Read only.

R array(dim_z, dim_z)

Measurement noise matrix

Q array(dim_x, dim_x)

Process noise matrix

fx callable(x, dt)

State transition function

hx callable(x)

Measurement function. Convert state x into a measurement

inv function, default numpy.linalg.inv

If you prefer another inverse function, such as the Moore-Penrose pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

Examples:

.. code-block:: Python

def hx(x):
   return np.array([x[0]])

F = np.array([[1., 1.],
              [0., 1.]])
def fx(x, dt):
    return np.dot(F, x)

x = np.array([0., 1.])
P = np.eye(2) * 100.
dt = 0.1
f = EnsembleKalmanFilter(x=x, P=P, dim_z=1, dt=dt,
                         N=8, hx=hx, fx=fx)

std_noise = 3.
f.R *= std_noise**2
f.Q = Q_discrete_white_noise(2, dt, .01)

while True:
    z = read_sensor()
    f.predict()
    f.update(np.asarray([z]))

See my book Kalman and Bayesian Filters in Python https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

References
  • [1] John L Crassidis and John L. Junkins. "Optimal Estimation of Dynamic Systems. CRC Press, second edition. 2012. pp, 257-9.
Source code in bayesian_filters/kalman/ensemble_kalman_filter.py
class EnsembleKalmanFilter(object):
    """
    This implements the ensemble Kalman filter (EnKF). The EnKF uses
    an ensemble of hundreds to thousands of state vectors that are randomly
    sampled around the estimate, and adds perturbations at each update and
    predict step. It is useful for extremely large systems such as found
    in hydrophysics. As such, this class is admittedly a toy as it is far
    too slow with large N.

    There are many versions of this sort of this filter. This formulation is
    due to Crassidis and Junkins [1]. It works with both linear and nonlinear
    systems.

    Parameters
    ----------

    x : np.array(dim_x)
        state mean

    P : np.array((dim_x, dim_x))
        covariance of the state

    dim_z : int
        Number of of measurement inputs. For example, if the sensor
        provides you with position in (x,y), dim_z would be 2.

    dt : float
        time step in seconds

    N : int
        number of sigma points (ensembles). Must be greater than 1.


    hx : function hx(x)
        Measurement function. May be linear or nonlinear - converts state
        x into a measurement. Return must be an np.array of the same
        dimensionality as the measurement vector.

    fx : function fx(x, dt)
        State transition function. May be linear or nonlinear. Projects
        state x into the next time period. Returns the projected state x.


    Attributes
    ----------
    x : numpy.array(dim_x, 1)
        State estimate

    P : numpy.array(dim_x, dim_x)
        State covariance matrix

    x_prior : numpy.array(dim_x, 1)
        Prior (predicted) state estimate. The *_prior and *_post attributes
        are for convienence; they store the  prior and posterior of the
        current epoch. Read Only.

    P_prior : numpy.array(dim_x, dim_x)
        Prior (predicted) state covariance matrix. Read Only.

    x_post : numpy.array(dim_x, 1)
        Posterior (updated) state estimate. Read Only.

    P_post : numpy.array(dim_x, dim_x)
        Posterior (updated) state covariance matrix. Read Only.

    z : numpy.array
        Last measurement used in update(). Read only.

    R : numpy.array(dim_z, dim_z)
        Measurement noise matrix

    Q : numpy.array(dim_x, dim_x)
        Process noise matrix

    fx : callable (x, dt)
        State transition function

    hx : callable (x)
        Measurement function. Convert state `x` into a measurement


    inv : function, default numpy.linalg.inv
        If you prefer another inverse function, such as the Moore-Penrose
        pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

    Examples
    --------

    .. code-block:: Python

        def hx(x):
           return np.array([x[0]])

        F = np.array([[1., 1.],
                      [0., 1.]])
        def fx(x, dt):
            return np.dot(F, x)

        x = np.array([0., 1.])
        P = np.eye(2) * 100.
        dt = 0.1
        f = EnsembleKalmanFilter(x=x, P=P, dim_z=1, dt=dt,
                                 N=8, hx=hx, fx=fx)

        std_noise = 3.
        f.R *= std_noise**2
        f.Q = Q_discrete_white_noise(2, dt, .01)

        while True:
            z = read_sensor()
            f.predict()
            f.update(np.asarray([z]))

    See my book Kalman and Bayesian Filters in Python
    https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

    References
    ----------

    - [1] John L Crassidis and John L. Junkins. "Optimal Estimation of
      Dynamic Systems. CRC Press, second edition. 2012. pp, 257-9.
    """

    def __init__(self, x, P, dim_z, dt, N, hx, fx):
        if dim_z <= 0:
            raise ValueError("dim_z must be greater than zero")

        if N <= 0:
            raise ValueError("N must be greater than zero")

        dim_x = len(x)
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.dt = dt
        self.N = N
        self.hx = hx
        self.fx = fx
        self.K = zeros((dim_x, dim_z))
        self.z = array([[None] * self.dim_z]).T
        self.S = zeros((dim_z, dim_z))  # system uncertainty
        self.SI = zeros((dim_z, dim_z))  # inverse system uncertainty

        self.initialize(x, P)
        self.Q = eye(dim_x)  # process uncertainty
        self.R = eye(dim_z)  # state uncertainty
        self.inv = np.linalg.inv

        # used to create error terms centered at 0 mean for
        # state and measurement
        self._mean = zeros(dim_x)
        self._mean_z = zeros(dim_z)

    def initialize(self, x, P):
        """
        Initializes the filter with the specified mean and
        covariance. Only need to call this if you are using the filter
        to filter more than one set of data; this is called by __init__

        Parameters
        ----------

        x : np.array(dim_z)
            state mean

        P : np.array((dim_x, dim_x))
            covariance of the state
        """

        if x.ndim != 1:
            raise ValueError("x must be a 1D array")

        self.sigmas = multivariate_normal(mean=x, cov=P, size=self.N)
        self.x = x
        self.P = P

        # these will always be a copy of x,P after predict() is called
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

        # these will always be a copy of x,P after update() is called
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

    def update(self, z, R=None):
        """
        Add a new measurement (z) to the kalman filter. If z is None, nothing
        is changed.

        Parameters
        ----------

        z : np.array
            measurement for this update.

        R : np.array, scalar, or None
            Optionally provide R to override the measurement noise for this
            one call, otherwise self.R will be used.
        """

        if z is None:
            self.z = array([[None] * self.dim_z]).T
            self.x_post = self.x.copy()
            self.P_post = self.P.copy()
            return

        if R is None:
            R = self.R
        if np.isscalar(R):
            R = eye(self.dim_z) * R

        N = self.N
        dim_z = len(z)
        sigmas_h = zeros((N, dim_z))

        # transform sigma points into measurement space
        for i in range(N):
            sigmas_h[i] = self.hx(self.sigmas[i])

        z_mean = np.mean(sigmas_h, axis=0)

        P_zz = (outer_product_sum(sigmas_h - z_mean) / (N - 1)) + R
        P_xz = outer_product_sum(self.sigmas - self.x, sigmas_h - z_mean) / (N - 1)

        self.S = P_zz
        self.SI = self.inv(self.S)
        self.K = dot(P_xz, self.SI)

        e_r = multivariate_normal(self._mean_z, R, N)
        for i in range(N):
            self.sigmas[i] += dot(self.K, z + e_r[i] - sigmas_h[i])

        self.x = np.mean(self.sigmas, axis=0)
        self.P = self.P - dot(dot(self.K, self.S), self.K.T)

        # save measurement and posterior state
        self.z = deepcopy(z)
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

    def predict(self):
        """Predict next position."""

        N = self.N
        for i, s in enumerate(self.sigmas):
            self.sigmas[i] = self.fx(s, self.dt)

        e = multivariate_normal(self._mean, self.Q, N)
        self.sigmas += e

        self.x = np.mean(self.sigmas, axis=0)
        self.P = outer_product_sum(self.sigmas - self.x) / (N - 1)

        # save prior
        self.x_prior = np.copy(self.x)
        self.P_prior = np.copy(self.P)

    def __repr__(self):
        return "\n".join(
            [
                "EnsembleKalmanFilter object",
                pretty_str("dim_x", self.dim_x),
                pretty_str("dim_z", self.dim_z),
                pretty_str("dt", self.dt),
                pretty_str("x", self.x),
                pretty_str("P", self.P),
                pretty_str("x_prior", self.x_prior),
                pretty_str("P_prior", self.P_prior),
                pretty_str("Q", self.Q),
                pretty_str("R", self.R),
                pretty_str("K", self.K),
                pretty_str("S", self.S),
                pretty_str("sigmas", self.sigmas),
                pretty_str("hx", self.hx),
                pretty_str("fx", self.fx),
            ]
        )

initialize(x, P)

Initializes the filter with the specified mean and covariance. Only need to call this if you are using the filter to filter more than one set of data; this is called by init

Parameters:

Name Type Description Default
x array(dim_z)

state mean

required
P array((dim_x, dim_x))

covariance of the state

required
Source code in bayesian_filters/kalman/ensemble_kalman_filter.py
def initialize(self, x, P):
    """
    Initializes the filter with the specified mean and
    covariance. Only need to call this if you are using the filter
    to filter more than one set of data; this is called by __init__

    Parameters
    ----------

    x : np.array(dim_z)
        state mean

    P : np.array((dim_x, dim_x))
        covariance of the state
    """

    if x.ndim != 1:
        raise ValueError("x must be a 1D array")

    self.sigmas = multivariate_normal(mean=x, cov=P, size=self.N)
    self.x = x
    self.P = P

    # these will always be a copy of x,P after predict() is called
    self.x_prior = self.x.copy()
    self.P_prior = self.P.copy()

    # these will always be a copy of x,P after update() is called
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()

predict()

Predict next position.

Source code in bayesian_filters/kalman/ensemble_kalman_filter.py
def predict(self):
    """Predict next position."""

    N = self.N
    for i, s in enumerate(self.sigmas):
        self.sigmas[i] = self.fx(s, self.dt)

    e = multivariate_normal(self._mean, self.Q, N)
    self.sigmas += e

    self.x = np.mean(self.sigmas, axis=0)
    self.P = outer_product_sum(self.sigmas - self.x) / (N - 1)

    # save prior
    self.x_prior = np.copy(self.x)
    self.P_prior = np.copy(self.P)

update(z, R=None)

Add a new measurement (z) to the kalman filter. If z is None, nothing is changed.

Parameters:

Name Type Description Default
z array

measurement for this update.

required
R np.array, scalar, or None

Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used.

None
Source code in bayesian_filters/kalman/ensemble_kalman_filter.py
def update(self, z, R=None):
    """
    Add a new measurement (z) to the kalman filter. If z is None, nothing
    is changed.

    Parameters
    ----------

    z : np.array
        measurement for this update.

    R : np.array, scalar, or None
        Optionally provide R to override the measurement noise for this
        one call, otherwise self.R will be used.
    """

    if z is None:
        self.z = array([[None] * self.dim_z]).T
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()
        return

    if R is None:
        R = self.R
    if np.isscalar(R):
        R = eye(self.dim_z) * R

    N = self.N
    dim_z = len(z)
    sigmas_h = zeros((N, dim_z))

    # transform sigma points into measurement space
    for i in range(N):
        sigmas_h[i] = self.hx(self.sigmas[i])

    z_mean = np.mean(sigmas_h, axis=0)

    P_zz = (outer_product_sum(sigmas_h - z_mean) / (N - 1)) + R
    P_xz = outer_product_sum(self.sigmas - self.x, sigmas_h - z_mean) / (N - 1)

    self.S = P_zz
    self.SI = self.inv(self.S)
    self.K = dot(P_xz, self.SI)

    e_r = multivariate_normal(self._mean_z, R, N)
    for i in range(N):
        self.sigmas[i] += dot(self.K, z + e_r[i] - sigmas_h[i])

    self.x = np.mean(self.sigmas, axis=0)
    self.P = self.P - dot(dot(self.K, self.S), self.K.T)

    # save measurement and posterior state
    self.z = deepcopy(z)
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()