Skip to content

Common

common

A collection of functions used throughout FilterPy, and/or functions that you will find useful when you build your filters.

Saver

Saver

Bases: object

Helper class to save the states of any filter object. Each time you call save() all of the attributes (state, covariances, etc) are appended to lists.

Generally you would do this once per epoch - predict/update.

Then, you can access any of the states by using the [] syntax or by using the . operator.

.. code-block:: Python

my_saver = Saver()
... do some filtering

x = my_saver['x']
x = my_save.x

Either returns a list of all of the state x values for the entire filtering process.

If you want to convert all saved lists into numpy arrays, call to_array().

Parameters:

Name Type Description Default
kf object

any object with a dict attribute, but intended to be one of the filtering classes

required
save_current bool

save the current state of kf when the object is created;

False
skip_private

Control skipping any private attribute (anything starting with '_') Turning this on saves memory, but slows down execution a bit.

False
skip_callable

Control skipping any attribute which is a method. Turning this on saves memory, but slows down execution a bit.

False
ignore

list of keys to ignore.

()

Examples:

.. code-block:: Python

kf = KalmanFilter(...whatever)
# initialize kf here

saver = Saver(kf) # save data for kf filter
for z in zs:
    kf.predict()
    kf.update(z)
    saver.save()

x = np.array(s.x) # get the kf.x state in an np.array
plt.plot(x[:, 0], x[:, 2])

# ... or ...
s.to_array()
plt.plot(s.x[:, 0], s.x[:, 2])
Source code in bayesian_filters/common/helpers.py
class Saver(object):
    """
    Helper class to save the states of any filter object.
    Each time you call save() all of the attributes (state, covariances, etc)
    are appended to lists.

    Generally you would do this once per epoch - predict/update.

    Then, you can access any of the states by using the [] syntax or by
    using the . operator.

    .. code-block:: Python

        my_saver = Saver()
        ... do some filtering

        x = my_saver['x']
        x = my_save.x

    Either returns a list of all of the state `x` values for the entire
    filtering process.

    If you want to convert all saved lists into numpy arrays, call to_array().


    Parameters
    ----------

    kf : object
        any object with a __dict__ attribute, but intended to be one of the
        filtering classes

    save_current : bool, default=False
        save the current state of `kf` when the object is created;

    skip_private: bool, default=False
        Control skipping any private attribute (anything starting with '_')
        Turning this on saves memory, but slows down execution a bit.

    skip_callable: bool, default=False
        Control skipping any attribute which is a method. Turning this on
        saves memory, but slows down execution a bit.

    ignore: (str,) tuple of strings
        list of keys to ignore.

    Examples
    --------

    .. code-block:: Python

        kf = KalmanFilter(...whatever)
        # initialize kf here

        saver = Saver(kf) # save data for kf filter
        for z in zs:
            kf.predict()
            kf.update(z)
            saver.save()

        x = np.array(s.x) # get the kf.x state in an np.array
        plt.plot(x[:, 0], x[:, 2])

        # ... or ...
        s.to_array()
        plt.plot(s.x[:, 0], s.x[:, 2])

    """

    def __init__(self, kf, save_current=False, skip_private=False, skip_callable=False, ignore=()):
        """Construct the save object, optionally saving the current
        state of the filter"""
        # pylint: disable=too-many-arguments

        self._kf = kf
        self._DL = defaultdict(list)
        self._skip_private = skip_private
        self._skip_callable = skip_callable
        self._ignore = ignore
        self._len = 0

        # need to save all properties since it is possible that the property
        # is computed only on access. I use this trick a lot to minimize
        # computing unused information.
        properties = inspect.getmembers(type(kf), lambda o: isinstance(o, property))
        self.properties = [p for p in properties if p[0] not in ignore]

        if save_current:
            self.save()

    def save(self):
        """save the current state of the Kalman filter"""

        kf = self._kf

        # force all attributes to be computed. this is only necessary
        # if the class uses properties that compute data only when
        # accessed
        for prop in self.properties:
            self._DL[prop[0]].append(getattr(kf, prop[0]))

        v = copy.deepcopy(kf.__dict__)

        if self._skip_private:
            for key in list(v.keys()):
                if key.startswith("_"):
                    del v[key]

        if self._skip_callable:
            for key in list(v.keys()):
                if callable(v[key]):
                    del v[key]

        for ig in self._ignore:
            if ig in v:
                del v[ig]

        for key in list(v.keys()):
            self._DL[key].append(v[key])

        self.__dict__.update(self._DL)
        self._len += 1

    def __getitem__(self, key):
        return self._DL[key]

    def __setitem__(self, key, newvalue):
        self._DL[key] = newvalue
        self.__dict__.update(self._DL)

    def __len__(self):
        return self._len

    @property
    def keys(self):
        """list of all keys"""
        return list(self._DL.keys())

    def to_array(self, flatten=False):
        """
        Convert all saved attributes from a list to np.array.

        This may or may not work - every saved attribute must have the
        same shape for every instance. i.e., if `K` changes shape due to `z`
        changing shape then the call will raise an exception.

        This can also happen if the default initialization in __init__ gives
        the variable a different shape then it becomes after a predict/update
        cycle.
        """
        for key in self.keys:
            try:
                self.__dict__[key] = np.array(self._DL[key])
            except:
                # get back to lists so we are in a valid state
                self.__dict__.update(self._DL)
                raise ValueError("could not convert {} into np.array".format(key))
        if flatten:
            self.flatten()

    def flatten(self):
        """
        Flattens any np.array of column vectors into 1D arrays. Basically,
        this makes data readable for humans if you are just inspecting via
        the REPL. For example, if you have saved a KalmanFilter object with 89
        epochs, self.x will be shape (89, 9, 1) (for example). After flatten
        is run, self.x.shape == (89, 9), which displays nicely from the REPL.

        There is no way to unflatten, so it's a one way trip.
        """

        for key in self.keys:
            try:
                arr = self.__dict__[key]
                shape = arr.shape
                if shape[2] == 1:
                    self.__dict__[key] = arr.reshape(shape[0], shape[1])
                arr = self.__dict__[key]
                shape = arr.shape
                if len(shape) == 2 and shape[1] == 1:
                    self.__dict__[key] = arr.ravel()
            except:
                # not an ndarray or not a column vector
                pass

    def __repr__(self):
        return "<Saver object at {}\n  Keys: {}>".format(hex(id(self)), " ".join(self.keys))

keys property

list of all keys

__init__(kf, save_current=False, skip_private=False, skip_callable=False, ignore=())

Construct the save object, optionally saving the current state of the filter

Source code in bayesian_filters/common/helpers.py
def __init__(self, kf, save_current=False, skip_private=False, skip_callable=False, ignore=()):
    """Construct the save object, optionally saving the current
    state of the filter"""
    # pylint: disable=too-many-arguments

    self._kf = kf
    self._DL = defaultdict(list)
    self._skip_private = skip_private
    self._skip_callable = skip_callable
    self._ignore = ignore
    self._len = 0

    # need to save all properties since it is possible that the property
    # is computed only on access. I use this trick a lot to minimize
    # computing unused information.
    properties = inspect.getmembers(type(kf), lambda o: isinstance(o, property))
    self.properties = [p for p in properties if p[0] not in ignore]

    if save_current:
        self.save()

flatten()

Flattens any np.array of column vectors into 1D arrays. Basically, this makes data readable for humans if you are just inspecting via the REPL. For example, if you have saved a KalmanFilter object with 89 epochs, self.x will be shape (89, 9, 1) (for example). After flatten is run, self.x.shape == (89, 9), which displays nicely from the REPL.

There is no way to unflatten, so it's a one way trip.

Source code in bayesian_filters/common/helpers.py
def flatten(self):
    """
    Flattens any np.array of column vectors into 1D arrays. Basically,
    this makes data readable for humans if you are just inspecting via
    the REPL. For example, if you have saved a KalmanFilter object with 89
    epochs, self.x will be shape (89, 9, 1) (for example). After flatten
    is run, self.x.shape == (89, 9), which displays nicely from the REPL.

    There is no way to unflatten, so it's a one way trip.
    """

    for key in self.keys:
        try:
            arr = self.__dict__[key]
            shape = arr.shape
            if shape[2] == 1:
                self.__dict__[key] = arr.reshape(shape[0], shape[1])
            arr = self.__dict__[key]
            shape = arr.shape
            if len(shape) == 2 and shape[1] == 1:
                self.__dict__[key] = arr.ravel()
        except:
            # not an ndarray or not a column vector
            pass

save()

save the current state of the Kalman filter

Source code in bayesian_filters/common/helpers.py
def save(self):
    """save the current state of the Kalman filter"""

    kf = self._kf

    # force all attributes to be computed. this is only necessary
    # if the class uses properties that compute data only when
    # accessed
    for prop in self.properties:
        self._DL[prop[0]].append(getattr(kf, prop[0]))

    v = copy.deepcopy(kf.__dict__)

    if self._skip_private:
        for key in list(v.keys()):
            if key.startswith("_"):
                del v[key]

    if self._skip_callable:
        for key in list(v.keys()):
            if callable(v[key]):
                del v[key]

    for ig in self._ignore:
        if ig in v:
            del v[ig]

    for key in list(v.keys()):
        self._DL[key].append(v[key])

    self.__dict__.update(self._DL)
    self._len += 1

to_array(flatten=False)

Convert all saved attributes from a list to np.array.

This may or may not work - every saved attribute must have the same shape for every instance. i.e., if K changes shape due to z changing shape then the call will raise an exception.

This can also happen if the default initialization in init gives the variable a different shape then it becomes after a predict/update cycle.

Source code in bayesian_filters/common/helpers.py
def to_array(self, flatten=False):
    """
    Convert all saved attributes from a list to np.array.

    This may or may not work - every saved attribute must have the
    same shape for every instance. i.e., if `K` changes shape due to `z`
    changing shape then the call will raise an exception.

    This can also happen if the default initialization in __init__ gives
    the variable a different shape then it becomes after a predict/update
    cycle.
    """
    for key in self.keys:
        try:
            self.__dict__[key] = np.array(self._DL[key])
        except:
            # get back to lists so we are in a valid state
            self.__dict__.update(self._DL)
            raise ValueError("could not convert {} into np.array".format(key))
    if flatten:
        self.flatten()

Discrete White Noise

Q_discrete_white_noise(dim, dt=1.0, var=1.0, block_size=1, order_by_dim=True)

Returns the Q matrix for the Discrete Constant White Noise Model. dim may be either 2, 3, or 4 dt is the time step, and sigma is the variance in the noise.

Q is computed as the G * G^T * variance, where G is the process noise per time step. In other words, G = [[.5dt^2][dt]]^T for the constant velocity model.

Parameters:

Name Type Description Default
dim int (2, 3, or 4)

dimension for Q, where the final dimension is (dim x dim)

required
dt float

time step in whatever units your filter is using for time. i.e. the amount of time between innovations

1.0
var float

variance in the noise

1.0
block_size int >= 1

If your state variable contains more than one dimension, such as a 3d constant velocity model [x x' y y' z z']^T, then Q must be a block diagonal matrix.

1
order_by_dim bool

Defines ordering of variables in the state vector. True orders by keeping all derivatives of each dimensions)

[x x' x'' y y' y'']

whereas False interleaves the dimensions

[x y z x' y' z' x'' y'' z'']

True

Examples:

>>> # constant velocity model in a 3D world with a 10 Hz update rate
>>> Q_discrete_white_noise(2, dt=0.1, var=1., block_size=3)
array([[0.000025, 0.0005  , 0.      , 0.      , 0.      , 0.      ],
       [0.0005  , 0.01    , 0.      , 0.      , 0.      , 0.      ],
       [0.      , 0.      , 0.000025, 0.0005  , 0.      , 0.      ],
       [0.      , 0.      , 0.0005  , 0.01    , 0.      , 0.      ],
       [0.      , 0.      , 0.      , 0.      , 0.000025, 0.0005  ],
       [0.      , 0.      , 0.      , 0.      , 0.0005  , 0.01    ]])
References

Bar-Shalom. "Estimation with Applications To Tracking and Navigation". John Wiley & Sons, 2001. Page 274.

Source code in bayesian_filters/common/discretization.py
def Q_discrete_white_noise(dim, dt=1.0, var=1.0, block_size=1, order_by_dim=True):
    """
    Returns the Q matrix for the Discrete Constant White Noise
    Model. dim may be either 2, 3, or 4 dt is the time step, and sigma
    is the variance in the noise.

    Q is computed as the G * G^T * variance, where G is the process noise per
    time step. In other words, G = [[.5dt^2][dt]]^T for the constant velocity
    model.

    Parameters
    -----------

    dim : int (2, 3, or 4)
        dimension for Q, where the final dimension is (dim x dim)

    dt : float, default=1.0
        time step in whatever units your filter is using for time. i.e. the
        amount of time between innovations

    var : float, default=1.0
        variance in the noise

    block_size : int >= 1
        If your state variable contains more than one dimension, such as
        a 3d constant velocity model [x x' y y' z z']^T, then Q must be
        a block diagonal matrix.

    order_by_dim : bool, default=True
        Defines ordering of variables in the state vector. `True` orders
        by keeping all derivatives of each dimensions)

        [x x' x'' y y' y'']

        whereas `False` interleaves the dimensions

        [x y z x' y' z' x'' y'' z'']


    Examples
    --------
    >>> # constant velocity model in a 3D world with a 10 Hz update rate
    >>> Q_discrete_white_noise(2, dt=0.1, var=1., block_size=3)
    array([[0.000025, 0.0005  , 0.      , 0.      , 0.      , 0.      ],
           [0.0005  , 0.01    , 0.      , 0.      , 0.      , 0.      ],
           [0.      , 0.      , 0.000025, 0.0005  , 0.      , 0.      ],
           [0.      , 0.      , 0.0005  , 0.01    , 0.      , 0.      ],
           [0.      , 0.      , 0.      , 0.      , 0.000025, 0.0005  ],
           [0.      , 0.      , 0.      , 0.      , 0.0005  , 0.01    ]])

    References
    ----------

    Bar-Shalom. "Estimation with Applications To Tracking and Navigation".
    John Wiley & Sons, 2001. Page 274.
    """

    if dim not in [2, 3, 4]:
        raise ValueError("dim must be between 2 and 4")

    if dim == 2:
        Q = [[0.25 * dt**4, 0.5 * dt**3], [0.5 * dt**3, dt**2]]
    elif dim == 3:
        Q = [
            [0.25 * dt**4, 0.5 * dt**3, 0.5 * dt**2],
            [0.5 * dt**3, dt**2, dt],
            [0.5 * dt**2, dt, 1],
        ]
    else:
        Q = [
            [(dt**6) / 36, (dt**5) / 12, (dt**4) / 6, (dt**3) / 6],
            [(dt**5) / 12, (dt**4) / 4, (dt**3) / 2, (dt**2) / 2],
            [(dt**4) / 6, (dt**3) / 2, dt**2, dt],
            [(dt**3) / 6, (dt**2) / 2, dt, 1.0],
        ]

    if order_by_dim:
        return block_diag(*[Q] * block_size) * var
    return order_by_derivative(array(Q), dim, block_size) * var

Continuous White Noise

Q_continuous_white_noise(dim, dt=1.0, spectral_density=1.0, block_size=1, order_by_dim=True)

Returns the Q matrix for the Discretized Continuous White Noise Model. dim may be either 2, 3, 4, dt is the time step, and sigma is the variance in the noise.

Parameters:

Name Type Description Default
dim int(2 or 3 or 4)

dimension for Q, where the final dimension is (dim x dim) 2 is constant velocity, 3 is constant acceleration, 4 is constant jerk

required
dt float

time step in whatever units your filter is using for time. i.e. the amount of time between innovations

1.0
spectral_density float

spectral density for the continuous process

1.0
block_size int >= 1

If your state variable contains more than one dimension, such as a 3d constant velocity model [x x' y y' z z']^T, then Q must be a block diagonal matrix.

1
order_by_dim bool

Defines ordering of variables in the state vector. True orders by keeping all derivatives of each dimensions)

[x x' x'' y y' y'']

whereas False interleaves the dimensions

[x y z x' y' z' x'' y'' z'']

True

Examples:

>>> # constant velocity model in a 3D world with a 10 Hz update rate
>>> Q_continuous_white_noise(2, dt=0.1, block_size=3)
array([[0.00033333, 0.005     , 0.        , 0.        , 0.        , 0.        ],
       [0.005     , 0.1       , 0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.00033333, 0.005     , 0.        , 0.        ],
       [0.        , 0.        , 0.005     , 0.1       , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 0.00033333, 0.005     ],
       [0.        , 0.        , 0.        , 0.        , 0.005     , 0.1       ]])
Source code in bayesian_filters/common/discretization.py
def Q_continuous_white_noise(dim, dt=1.0, spectral_density=1.0, block_size=1, order_by_dim=True):
    """
    Returns the Q matrix for the Discretized Continuous White Noise
    Model. dim may be either 2, 3, 4, dt is the time step, and sigma is the
    variance in the noise.

    Parameters
    ----------

    dim : int (2 or 3 or 4)
        dimension for Q, where the final dimension is (dim x dim)
        2 is constant velocity, 3 is constant acceleration, 4 is
        constant jerk

    dt : float, default=1.0
        time step in whatever units your filter is using for time. i.e. the
        amount of time between innovations

    spectral_density : float, default=1.0
        spectral density for the continuous process

    block_size : int >= 1
        If your state variable contains more than one dimension, such as
        a 3d constant velocity model [x x' y y' z z']^T, then Q must be
        a block diagonal matrix.

    order_by_dim : bool, default=True
        Defines ordering of variables in the state vector. `True` orders
        by keeping all derivatives of each dimensions)

        [x x' x'' y y' y'']

        whereas `False` interleaves the dimensions

        [x y z x' y' z' x'' y'' z'']

    Examples
    --------

    >>> # constant velocity model in a 3D world with a 10 Hz update rate
    >>> Q_continuous_white_noise(2, dt=0.1, block_size=3)
    array([[0.00033333, 0.005     , 0.        , 0.        , 0.        , 0.        ],
           [0.005     , 0.1       , 0.        , 0.        , 0.        , 0.        ],
           [0.        , 0.        , 0.00033333, 0.005     , 0.        , 0.        ],
           [0.        , 0.        , 0.005     , 0.1       , 0.        , 0.        ],
           [0.        , 0.        , 0.        , 0.        , 0.00033333, 0.005     ],
           [0.        , 0.        , 0.        , 0.        , 0.005     , 0.1       ]])
    """

    if dim not in [2, 3, 4]:
        raise ValueError("dim must be between 2 and 4")

    if dim == 2:
        Q = [[(dt**3) / 3.0, (dt**2) / 2.0], [(dt**2) / 2.0, dt]]
    elif dim == 3:
        Q = [
            [(dt**5) / 20.0, (dt**4) / 8.0, (dt**3) / 6.0],
            [(dt**4) / 8.0, (dt**3) / 3.0, (dt**2) / 2.0],
            [(dt**3) / 6.0, (dt**2) / 2.0, dt],
        ]

    else:
        Q = [
            [(dt**7) / 252.0, (dt**6) / 72.0, (dt**5) / 30.0, (dt**4) / 24.0],
            [(dt**6) / 72.0, (dt**5) / 20.0, (dt**4) / 8.0, (dt**3) / 6.0],
            [(dt**5) / 30.0, (dt**4) / 8.0, (dt**3) / 3.0, (dt**2) / 2.0],
            [(dt**4) / 24.0, (dt**3) / 6.0, (dt**2 / 2.0), dt],
        ]

    if order_by_dim:
        return block_diag(*[Q] * block_size) * spectral_density

    return order_by_derivative(array(Q), dim, block_size) * spectral_density

Van Loan Discretization

van_loan_discretization(F, G, dt)

Discretizes a linear differential equation which includes white noise according to the method of C. F. van Loan [1]. Given the continuous model

x' =  Fx + Gu

where u is the unity white noise, we compute and return the sigma and Q_k that discretizes that equation.

Examples:

Given y'' + y = 2u(t), we create the continuous state model of

x' = [ 0 1] * x + [0]*u(t) [-1 0] [2]

and a time step of 0.1:

>>> F = np.array([[0,1],[-1,0]], dtype=float)
>>> G = np.array([[0.],[2.]])
>>> phi, Q = van_loan_discretization(F, G, 0.1)
>>> phi
array([[ 0.99500417,  0.09983342],
       [-0.09983342,  0.99500417]])
>>> Q
array([[ 0.00133067,  0.01993342],
       [ 0.01993342,  0.39866933]])

(example taken from Brown[2])

References

[1] C. F. van Loan. "Computing Integrals Involving the Matrix Exponential." IEEE Trans. Automomatic Control, AC-23 (3): 395-404 (June 1978)

[2] Robert Grover Brown. "Introduction to Random Signals and Applied Kalman Filtering." Forth edition. John Wiley & Sons. p. 126-7. (2012)

Source code in bayesian_filters/common/discretization.py
def van_loan_discretization(F, G, dt):
    """
    Discretizes a linear differential equation which includes white noise
    according to the method of C. F. van Loan [1]. Given the continuous
    model

        x' =  Fx + Gu

    where u is the unity white noise, we compute and return the sigma and Q_k
    that discretizes that equation.


    Examples
    --------

    Given y'' + y = 2u(t), we create the continuous state model of

    x' = [ 0 1] * x + [0]*u(t)
         [-1 0]       [2]

    and a time step of 0.1:


    >>> F = np.array([[0,1],[-1,0]], dtype=float)
    >>> G = np.array([[0.],[2.]])
    >>> phi, Q = van_loan_discretization(F, G, 0.1)

    >>> phi
    array([[ 0.99500417,  0.09983342],
           [-0.09983342,  0.99500417]])

    >>> Q
    array([[ 0.00133067,  0.01993342],
           [ 0.01993342,  0.39866933]])

    (example taken from Brown[2])


    References
    ----------

    [1] C. F. van Loan. "Computing Integrals Involving the Matrix Exponential."
        IEEE Trans. Automomatic Control, AC-23 (3): 395-404 (June 1978)

    [2] Robert Grover Brown. "Introduction to Random Signals and Applied
        Kalman Filtering." Forth edition. John Wiley & Sons. p. 126-7. (2012)
    """

    n = F.shape[0]

    A = zeros((2 * n, 2 * n))

    # we assume u(t) is unity, and require that G incorporate the scaling term
    # for the noise. Hence W = 1, and GWG' reduces to GG"

    A[0:n, 0:n] = -F.dot(dt)
    A[0:n, n : 2 * n] = G.dot(G.T).dot(dt)
    A[n : 2 * n, n : 2 * n] = F.T.dot(dt)

    B = expm(A)

    sigma = B[n : 2 * n, n : 2 * n].T

    Q = sigma.dot(B[0:n, n : 2 * n])

    return (sigma, Q)

Linear ODE Discretization

linear_ode_discretation(F, L=None, Q=None, dt=1.0)

Not sure what this does, probably should be removed

Source code in bayesian_filters/common/discretization.py
def linear_ode_discretation(F, L=None, Q=None, dt=1.0):
    """
    Not sure what this does, probably should be removed
    """

    n = F.shape[0]

    if L is None:
        L = eye(n)

    if Q is None:
        Q = zeros((n, n))

    A = expm(F * dt)

    phi = zeros((2 * n, 2 * n))

    phi[0:n, 0:n] = F
    phi[0:n, n : 2 * n] = L.dot(Q).dot(L.T)
    phi[n : 2 * n, n : 2 * n] = -F.T

    zo = vstack((zeros((n, n)), eye(n)))

    CD = expm(phi * dt).dot(zo)

    C = CD[0:n, :]
    D = CD[n : 2 * n, :]
    q = C.dot(inv(D))

    return (A, q)

Kinematic Kalman Filter

kinematic_kf(dim, order, dt=1.0, dim_z=1, order_by_dim=True, kf=None)

Returns a KalmanFilter using newtonian kinematics of arbitrary order for any number of dimensions. For example, a constant velocity filter in 3D space would have order 1 dimension 3.

Examples:

A constant velocity filter in 3D space with delta time = .2 seconds would be created with

>>> kf = kinematic_kf(dim=3, order=1, dt=.2)
>>> kf.F
>>> array([[1. , 0.2, 0. , 0. , 0. , 0. ],
           [0. , 1. , 0. , 0. , 0. , 0. ],
           [0. , 0. , 1. , 0.2, 0. , 0. ],
           [0. , 0. , 0. , 1. , 0. , 0. ],
           [0. , 0. , 0. , 0. , 1. , 0.2],
           [0. , 0. , 0. , 0. , 0. , 1. ]])

which will set the state x to be interpreted as

[x, x', y, y', z, z'].T

If you set order_by_dim to False, then x is ordered as

[x y z x' y' z'].T

As another example, a 2D constant jerk is created with

kinematic_kf(2, 3)

Assumes that the measurement z is position in each dimension. If this is not true you will have to alter the H matrix by hand.

P, Q, R are all set to the Identity matrix.

H is assigned assuming the measurement is position, one per dimension dim.

>>> kf = kinematic_kf(2, 1, dt=3.0)
>>> kf.F
array([[1., 3., 0., 0.],
       [0., 1., 0., 0.],
       [0., 0., 1., 3.],
       [0., 0., 0., 1.]])

Parameters:

Name Type Description Default
dim int, >= 1

number of dimensions (2D space would be dim=2)

required
order int, >= 0

order of the filter. 2 would be a const acceleration model with a stat

required
dim_z int

size of z vector per dimension dim. Normally should be 1

1
dt float

Time step. Used to create the state transition matrix

1.0
order_by_dim bool

Defines ordering of variables in the state vector. True orders by keeping all derivatives of each dimensions)

[x x' x'' y y' y'']

whereas False interleaves the dimensions

[x y z x' y' z' x'' y'' z'']

True
kf kalman filter like object

Provide your own pre-created filter. This lets you use classes other than KalmanFilter.

None
Source code in bayesian_filters/common/kinematic.py
def kinematic_kf(dim, order, dt=1.0, dim_z=1, order_by_dim=True, kf=None):
    """
    Returns a KalmanFilter using newtonian kinematics of arbitrary order
    for any number of dimensions. For example, a constant velocity filter
    in 3D space would have order 1 dimension 3.


    Examples
    --------

    A constant velocity filter in 3D space with delta time = .2 seconds
    would be created with

    >>> kf = kinematic_kf(dim=3, order=1, dt=.2)
    >>> kf.F
    >>> array([[1. , 0.2, 0. , 0. , 0. , 0. ],
               [0. , 1. , 0. , 0. , 0. , 0. ],
               [0. , 0. , 1. , 0.2, 0. , 0. ],
               [0. , 0. , 0. , 1. , 0. , 0. ],
               [0. , 0. , 0. , 0. , 1. , 0.2],
               [0. , 0. , 0. , 0. , 0. , 1. ]])


    which will set the state `x` to be interpreted as

    [x, x', y, y', z, z'].T

    If you set `order_by_dim` to False, then `x` is ordered as

    [x y z x' y' z'].T

    As another example, a 2D constant jerk is created with

    >> kinematic_kf(2, 3)


    Assumes that the measurement z is position in each dimension. If this is not
    true you will have to alter the H matrix by hand.

    P, Q, R are all set to the Identity matrix.

    H is assigned assuming the measurement is position, one per dimension `dim`.


    >>> kf = kinematic_kf(2, 1, dt=3.0)
    >>> kf.F
    array([[1., 3., 0., 0.],
           [0., 1., 0., 0.],
           [0., 0., 1., 3.],
           [0., 0., 0., 1.]])

    Parameters
    ----------

    dim : int, >= 1
        number of dimensions (2D space would be dim=2)

    order : int, >= 0
        order of the filter. 2 would be a const acceleration model with
        a stat

    dim_z : int, default 1
        size of z vector *per* dimension `dim`. Normally should be 1

    dt : float, default 1.0
        Time step. Used to create the state transition matrix

    order_by_dim : bool, default=True
        Defines ordering of variables in the state vector. `True` orders
        by keeping all derivatives of each dimensions)

        [x x' x'' y y' y'']

        whereas `False` interleaves the dimensions

        [x y z x' y' z' x'' y'' z'']

    kf : kalman filter like object, optional, default None
        Provide your own pre-created filter. This lets you use classes other
        than KalmanFilter.
    """

    from bayesian_filters.kalman import KalmanFilter

    if dim < 1:
        raise ValueError("dim must be >= 1")
    if order < 0:
        raise ValueError("order must be >= 0")
    if dim_z < 1:
        raise ValueError("dim_z must be >= 1")

    dim_x = order + 1

    if kf is None:
        kf = KalmanFilter(dim_x=dim * dim_x, dim_z=dim_z)
    assert kf.dim_x == dim * dim_x
    assert kf.dim_z == dim_z

    F = kinematic_state_transition(order, dt)
    if order_by_dim:
        diag = [F] * dim
        kf.F = block_diag(*diag)
    else:
        kf.F.fill(0.0)
        for i, x in enumerate(F.ravel()):
            f = np.eye(dim) * x

            ix, iy = (i // dim_x) * dim, (i % dim_x) * dim
            kf.F[ix : ix + dim, iy : iy + dim] = f

    if order_by_dim:
        for i in range(dim_z):
            for j in range(dim):
                kf.H[i, j * dim_x] = 1.0
    else:
        for i in range(dim_z):
            for j in range(dim):
                kf.H[i, j] = 1.0

    return kf

Kinematic State Transition

kinematic_state_transition(order, dt)

create a state transition matrix of a given order for a given time step dt.

Source code in bayesian_filters/common/kinematic.py
def kinematic_state_transition(order, dt):
    """
    create a state transition matrix of a given order for a given time
    step `dt`.
    """

    if not (order >= 0 and int(order) == order):
        raise ValueError("order must be an int >= 0")

    # hard code common cases for computational efficiency
    if order == 0:
        return np.array([[1.0]])
    if order == 1:
        return np.array([[1.0, dt], [0.0, 1.0]])
    if order == 2:
        return np.array([[1.0, dt, 0.5 * dt * dt], [0.0, 1.0, dt], [0.0, 0.0, 1.0]])

    # grind it out computationally....
    N = order + 1

    F = np.zeros((N, N))
    # compute highest order row
    for n in range(N):
        F[0, n] = float(dt**n) / math.factorial(n)

    # copy with a shift to get lower order rows
    for j in range(1, N):
        F[j, j:] = F[0, 0:-j]

    return F

Runge-Kutta 4th Order

runge_kutta4(y, x, dx, f)

computes 4th order Runge-Kutta for dy/dx.

Parameters:

Name Type Description Default
y scalar

Initial/current value for y

required
x scalar

Initial/current value for x

required
dx scalar

difference in x (e.g. the time step)

required
f ufunc(y, x)

Callable function (y, x) that you supply to compute dy/dx for the specified values.

required
Source code in bayesian_filters/common/helpers.py
def runge_kutta4(y, x, dx, f):
    """computes 4th order Runge-Kutta for dy/dx.

    Parameters
    ----------

    y : scalar
        Initial/current value for y
    x : scalar
        Initial/current value for x
    dx : scalar
        difference in x (e.g. the time step)
    f : ufunc(y,x)
        Callable function (y, x) that you supply to compute dy/dx for
        the specified values.

    """

    k1 = dx * f(y, x)
    k2 = dx * f(y + 0.5 * k1, x + 0.5 * dx)
    k3 = dx * f(y + 0.5 * k2, x + 0.5 * dx)
    k4 = dx * f(y + k3, x + dx)

    return y + (k1 + 2 * k2 + 2 * k3 + k4) / 6.0

Inverse Diagonal

inv_diagonal(S)

Computes the inverse of a diagonal NxN np.array S. In general this will be much faster than calling np.linalg.inv().

However, does NOT check if the off diagonal elements are non-zero. So long as S is truly diagonal, the output is identical to np.linalg.inv().

Parameters:

Name Type Description Default
S array

diagonal NxN array to take inverse of

required

Returns:

Name Type Description
S_inv array

inverse of S

Examples:

This is meant to be used as a replacement inverse function for the KalmanFilter class when you know the system covariance S is diagonal. It just makes the filter run faster, there is

>>> kf = KalmanFilter(dim_x=3, dim_z=1)
>>> kf.inv = inv_diagonal  # S is 1x1, so safely diagonal
Source code in bayesian_filters/common/helpers.py
def inv_diagonal(S):
    """
    Computes the inverse of a diagonal NxN np.array S. In general this will
    be much faster than calling np.linalg.inv().

    However, does NOT check if the off diagonal elements are non-zero. So long
    as S is truly diagonal, the output is identical to np.linalg.inv().

    Parameters
    ----------
    S : np.array
        diagonal NxN array to take inverse of

    Returns
    -------
    S_inv : np.array
        inverse of S


    Examples
    --------

    This is meant to be used as a replacement inverse function for
    the KalmanFilter class when you know the system covariance S is
    diagonal. It just makes the filter run faster, there is

    >>> kf = KalmanFilter(dim_x=3, dim_z=1)
    >>> kf.inv = inv_diagonal  # S is 1x1, so safely diagonal
    """

    S = np.asarray(S)

    if S.ndim != 2 or S.shape[0] != S.shape[1]:
        raise ValueError("S must be a square Matrix")

    si = np.zeros(S.shape)
    for i in range(len(S)):
        si[i, i] = 1.0 / S[i, i]
    return si

Outer Product Sum

outer_product_sum(A, B=None)

Computes the sum of the outer products of the rows in A and B

P = \Sum {A[i] B[i].T} for i in 0..N

Notionally:

P = 0
for y in A:
    P += np.outer(y, y)

This is a standard computation for sigma points used in the UKF, ensemble Kalman filter, etc., where A would be the residual of the sigma points and the filter's state or measurement.

The computation is vectorized, so it is much faster than the for loop for large A.

Parameters:

Name Type Description Default
A (array, shape(M, N))

rows of N-vectors to have the outer product summed

required
B (array, shape(M, N))

rows of N-vectors to have the outer product summed If it is None, it is set to A.

None

Returns:

Name Type Description
P (array, shape(N, N))

sum of the outer product of the rows of A and B

Examples:

Here sigmas is of shape (M, N), and x is of shape (N). The two sets of code compute the same thing.

>>> P = outer_product_sum(sigmas - x)
>>>
>>> P = 0
>>> for s in sigmas:
>>>     y = s - x
>>>     P += np.outer(y, y)
Source code in bayesian_filters/common/helpers.py
def outer_product_sum(A, B=None):
    r"""
    Computes the sum of the outer products of the rows in A and B

        P = \Sum {A[i] B[i].T} for i in 0..N

        Notionally:

        P = 0
        for y in A:
            P += np.outer(y, y)

    This is a standard computation for sigma points used in the UKF, ensemble
    Kalman filter, etc., where A would be the residual of the sigma points
    and the filter's state or measurement.

    The computation is vectorized, so it is much faster than the for loop
    for large A.

    Parameters
    ----------
    A : np.array, shape (M, N)
        rows of N-vectors to have the outer product summed

    B : np.array, shape (M, N)
        rows of N-vectors to have the outer product summed
        If it is `None`, it is set to A.

    Returns
    -------
    P : np.array, shape(N, N)
        sum of the outer product of the rows of A and B

    Examples
    --------

    Here sigmas is of shape (M, N), and x is of shape (N). The two sets of
    code compute the same thing.

    >>> P = outer_product_sum(sigmas - x)
    >>>
    >>> P = 0
    >>> for s in sigmas:
    >>>     y = s - x
    >>>     P += np.outer(y, y)
    """

    if B is None:
        B = A

    outer = np.einsum("ij,ik->ijk", A, B)
    return np.sum(outer, axis=0)