Skip to content

GH Filter

GHFilter

Implements the g-h filter.

GHFilter

Bases: object

Implements the g-h filter. The topic is too large to cover in this comment. See my book "Kalman and Bayesian Filters in Python" [1] or Eli Brookner's "Tracking and Kalman Filters Made Easy" [2].

A few basic examples are below, and the tests in ./gh_tests.py may give you more ideas on use.

Parameters:

Name Type Description Default
x 1D np.array or scalar

Initial value for the filter state. Each value can be a scalar or a np.array.

You can use a scalar for x0. If order > 0, then 0.0 is assumed for the higher order terms.

x[0] is the value being tracked x[1] is the first derivative (for order 1 and 2 filters) x[2] is the second derivative (for order 2 filters)

required
dx 1D np.array or scalar

Initial value for the derivative of the filter state.

required
dt scalar

time step

required
g float

filter g gain parameter.

required
h float

filter h gain parameter.

required

Attributes:

Name Type Description
x 1D np.array or scalar

filter state

dx 1D np.array or scalar

derivative of the filter state.

x_prediction 1D np.array or scalar

predicted filter state

dx_prediction 1D np.array or scalar

predicted derivative of the filter state.

dt scalar

time step

g float

filter g gain parameter.

h float

filter h gain parameter.

y np.array, or scalar

residual (difference between measurement and prior)

z np.array, or scalar

measurement passed into update()

Examples:

Create a basic filter for a scalar value with g=.8, h=.2. Initialize to 0, with a derivative(velocity) of 0.

>>> from bayesian_filters.gh import GHFilter
>>> f = GHFilter (x=0., dx=0., dt=1., g=.8, h=.2)

Incorporate the measurement of 1

>>> f.update(z=1)
(0.8, 0.2)

Incorporate a measurement of 2 with g=1 and h=0.01

>>> f.update(z=2, g=1, h=0.01)
(2.0, 0.21000000000000002)

Create a filter with two independent variables.

>>> from numpy import array
>>> f = GHFilter (x=array([0,1]), dx=array([0,0]), dt=1, g=.8, h=.02)

and update with the measurements (2,4)

>>> f.update(array([2,4])
(array([ 1.6,  3.4]), array([ 0.04,  0.06]))
References

[1] Labbe, "Kalman and Bayesian Filters in Python" http://rlabbe.github.io/Kalman-and-Bayesian-Filters-in-Python

[2] Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and Sons, 1998.

Source code in bayesian_filters/gh/gh_filter.py
class GHFilter(object):
    """
    Implements the g-h filter. The topic is too large to cover in
    this comment. See my book "Kalman and Bayesian Filters in Python" [1]
    or Eli Brookner's "Tracking and Kalman Filters Made Easy" [2].

    A few basic examples are below, and the tests in ./gh_tests.py may
    give you more ideas on use.


    Parameters
    ----------

    x : 1D np.array or scalar
        Initial value for the filter state. Each value can be a scalar
        or a np.array.

        You can use a scalar for x0. If order > 0, then 0.0 is assumed
        for the higher order terms.

        x[0] is the value being tracked
        x[1] is the first derivative (for order 1 and 2 filters)
        x[2] is the second derivative (for order 2 filters)

    dx : 1D np.array or scalar
        Initial value for the derivative of the filter state.

    dt : scalar
        time step

    g : float
        filter g gain parameter.

    h : float
        filter h gain parameter.


    Attributes
    ----------
    x : 1D np.array or scalar
        filter state

    dx : 1D np.array or scalar
        derivative of the filter state.

    x_prediction : 1D np.array or scalar
        predicted filter state

    dx_prediction : 1D np.array or scalar
        predicted derivative of the filter state.

    dt : scalar
        time step

    g : float
        filter g gain parameter.

    h : float
        filter h gain parameter.

    y : np.array, or scalar
        residual (difference between measurement and prior)

    z : np.array, or scalar
        measurement passed into update()

    Examples
    --------

    Create a basic filter for a scalar value with g=.8, h=.2.
    Initialize to 0, with a derivative(velocity) of 0.

    >>> from bayesian_filters.gh import GHFilter
    >>> f = GHFilter (x=0., dx=0., dt=1., g=.8, h=.2)

    Incorporate the measurement of 1

    >>> f.update(z=1)
    (0.8, 0.2)

    Incorporate a measurement of 2 with g=1 and h=0.01

    >>> f.update(z=2, g=1, h=0.01)
    (2.0, 0.21000000000000002)

    Create a filter with two independent variables.

    >>> from numpy import array
    >>> f = GHFilter (x=array([0,1]), dx=array([0,0]), dt=1, g=.8, h=.02)

    and update with the measurements (2,4)

    >>> f.update(array([2,4])
    (array([ 1.6,  3.4]), array([ 0.04,  0.06]))


    References
    ----------

    [1] Labbe, "Kalman and Bayesian Filters in Python"
    http://rlabbe.github.io/Kalman-and-Bayesian-Filters-in-Python

    [2] Brookner, "Tracking and Kalman Filters Made Easy". John Wiley and
    Sons, 1998.

    """

    def __init__(self, x, dx, dt, g, h):
        self.x = x
        self.dx = dx
        self.dt = dt
        self.g = g
        self.h = h
        self.dx_prediction = self.dx
        self.x_prediction = self.x

        if np.ndim(x) == 0:
            self.y = 0.0  # residual
            self.z = 0.0
        else:
            self.y = np.zeros(len(x))
            self.z = np.zeros(len(x))

    def update(self, z, g=None, h=None):
        """
        performs the g-h filter predict and update step on the
        measurement z. Modifies the member variables listed below,
        and returns the state of x and dx as a tuple as a convienence.

        **Modified Members**

        x
            filtered state variable

        dx
            derivative (velocity) of x

        residual
            difference between the measurement and the prediction for x

        x_prediction
            predicted value of x before incorporating the measurement z.

        dx_prediction
            predicted value of the derivative of x before incorporating the
            measurement z.

        Parameters
        ----------

        z : any
            the measurement
        g : scalar (optional)
            Override the fixed self.g value for this update
        h : scalar (optional)
            Override the fixed self.h value for this update

        Returns
        -------

        x filter output for x
        dx filter output for dx (derivative of x
        """

        if g is None:
            g = self.g
        if h is None:
            h = self.h

        # prediction step
        self.dx_prediction = self.dx
        self.x_prediction = self.x + (self.dx * self.dt)

        # update step
        self.y = z - self.x_prediction
        self.dx = self.dx_prediction + h * self.y / self.dt
        self.x = self.x_prediction + g * self.y

        return (self.x, self.dx)

    def batch_filter(self, data, save_predictions=False, saver=None):
        """
        Given a sequenced list of data, performs g-h filter
        with a fixed g and h. See update() if you need to vary g and/or h.

        Uses self.x and self.dx to initialize the filter, but DOES NOT
        alter self.x and self.dx during execution, allowing you to use this
        class multiple times without reseting self.x and self.dx. I'm not sure
        how often you would need to do that, but the capability is there.
        More exactly, none of the class member variables are modified
        by this function, in distinct contrast to update(), which changes
        most of them.

        Parameters
        ----------

        data : list like
            contains the data to be filtered.

        save_predictions : boolean
            the predictions will be saved and returned if this is true

        saver : bayesian_filters.common.Saver, optional
            bayesian_filters.common.Saver object. If provided, saver.save() will be
            called after every epoch


        Returns
        -------

        results : np.array shape (n+1, 2), where n=len(data)
            contains the results of the filter, where
            results[i,0] is x , and
            results[i,1] is dx (derivative of x)
            First entry is the initial values of x and dx as set by __init__.

        predictions : np.array shape(n), optional
            the predictions for each step in the filter. Only retured if
            save_predictions == True
        """

        x = self.x
        dx = self.dx
        n = len(data)

        results = np.zeros((n + 1, 2))
        results[0, 0] = x
        results[0, 1] = dx

        if save_predictions:
            predictions = np.zeros(n)

        # optimization to avoid n computations of h / dt
        h_dt = self.h / self.dt

        for i, z in enumerate(data):
            # prediction step
            x_est = x + (dx * self.dt)

            # update step
            residual = z - x_est
            dx = dx + h_dt * residual  # i.e. dx = dx + h * residual / dt
            x = x_est + self.g * residual

            results[i + 1, 0] = x
            results[i + 1, 1] = dx
            if save_predictions:
                predictions[i] = x_est

            if saver is not None:
                saver.save()

        if save_predictions:
            return results, predictions

        return results

    def VRF_prediction(self):
        r"""
        Returns the Variance Reduction Factor of the prediction
        step of the filter. The VRF is the
        normalized variance for the filter, as given in the equation below.

        .. math::
            VRF(\hat{x}_{n+1,n}) = \\frac{VAR(\hat{x}_{n+1,n})}{\sigma^2_x}

        References
        ----------

        Asquith, "Weight Selection in First Order Linear Filters"
        Report No RG-TR-69-12, U.S. Army Missle Command. Redstone Arsenal, Al.
        November 24, 1970.
        """

        g = self.g
        h = self.h

        return (2 * g**2 + 2 * h + g * h) / (g * (4 - 2 * g - h))

    def VRF(self):
        r"""
        Returns the Variance Reduction Factor (VRF) of the state variable
        of the filter (x) and its derivatives (dx, ddx). The VRF is the
        normalized variance for the filter, as given in the equations below.

        .. math::
            VRF(\hat{x}_{n,n}) = \\frac{VAR(\hat{x}_{n,n})}{\sigma^2_x}

            VRF(\hat{\dot{x}}_{n,n}) = \\frac{VAR(\hat{\dot{x}}_{n,n})}{\sigma^2_x}

            VRF(\hat{\ddot{x}}_{n,n}) = \\frac{VAR(\hat{\ddot{x}}_{n,n})}{\sigma^2_x}

        Returns
        -------

        vrf_x   VRF of x state variable
        vrf_dx  VRF of the dx state variable (derivative of x)
        """

        g = self.g
        h = self.h

        den = g * (4 - 2 * g - h)

        vx = (2 * g**2 + 2 * h - 3 * g * h) / den
        vdx = 2 * h**2 / (self.dt**2 * den)

        return (vx, vdx)

    def __repr__(self):
        return "\n".join(
            [
                "GHFilter object",
                pretty_str("dt", self.dt),
                pretty_str("g", self.g),
                pretty_str("h", self.h),
                pretty_str("x", self.x),
                pretty_str("dx", self.dx),
                pretty_str("x_prediction", self.x_prediction),
                pretty_str("dx_prediction", self.dx_prediction),
                pretty_str("y", self.y),
                pretty_str("z", self.z),
            ]
        )

VRF()

Returns the Variance Reduction Factor (VRF) of the state variable of the filter (x) and its derivatives (dx, ddx). The VRF is the normalized variance for the filter, as given in the equations below.

.. math:: VRF(\hat{x}{n,n}) = \frac{VAR(\hat{x}})}{\sigma^2_x

VRF(\hat{\dot{x}}_{n,n}) = \\frac{VAR(\hat{\dot{x}}_{n,n})}{\sigma^2_x}

VRF(\hat{\ddot{x}}_{n,n}) = \\frac{VAR(\hat{\ddot{x}}_{n,n})}{\sigma^2_x}

Returns:

Type Description
vrf_x VRF of x state variable
vrf_dx VRF of the dx state variable (derivative of x)
Source code in bayesian_filters/gh/gh_filter.py
def VRF(self):
    r"""
    Returns the Variance Reduction Factor (VRF) of the state variable
    of the filter (x) and its derivatives (dx, ddx). The VRF is the
    normalized variance for the filter, as given in the equations below.

    .. math::
        VRF(\hat{x}_{n,n}) = \\frac{VAR(\hat{x}_{n,n})}{\sigma^2_x}

        VRF(\hat{\dot{x}}_{n,n}) = \\frac{VAR(\hat{\dot{x}}_{n,n})}{\sigma^2_x}

        VRF(\hat{\ddot{x}}_{n,n}) = \\frac{VAR(\hat{\ddot{x}}_{n,n})}{\sigma^2_x}

    Returns
    -------

    vrf_x   VRF of x state variable
    vrf_dx  VRF of the dx state variable (derivative of x)
    """

    g = self.g
    h = self.h

    den = g * (4 - 2 * g - h)

    vx = (2 * g**2 + 2 * h - 3 * g * h) / den
    vdx = 2 * h**2 / (self.dt**2 * den)

    return (vx, vdx)

VRF_prediction()

Returns the Variance Reduction Factor of the prediction step of the filter. The VRF is the normalized variance for the filter, as given in the equation below.

.. math:: VRF(\hat{x}{n+1,n}) = \frac{VAR(\hat{x}})}{\sigma^2_x

References

Asquith, "Weight Selection in First Order Linear Filters" Report No RG-TR-69-12, U.S. Army Missle Command. Redstone Arsenal, Al. November 24, 1970.

Source code in bayesian_filters/gh/gh_filter.py
def VRF_prediction(self):
    r"""
    Returns the Variance Reduction Factor of the prediction
    step of the filter. The VRF is the
    normalized variance for the filter, as given in the equation below.

    .. math::
        VRF(\hat{x}_{n+1,n}) = \\frac{VAR(\hat{x}_{n+1,n})}{\sigma^2_x}

    References
    ----------

    Asquith, "Weight Selection in First Order Linear Filters"
    Report No RG-TR-69-12, U.S. Army Missle Command. Redstone Arsenal, Al.
    November 24, 1970.
    """

    g = self.g
    h = self.h

    return (2 * g**2 + 2 * h + g * h) / (g * (4 - 2 * g - h))

batch_filter(data, save_predictions=False, saver=None)

Given a sequenced list of data, performs g-h filter with a fixed g and h. See update() if you need to vary g and/or h.

Uses self.x and self.dx to initialize the filter, but DOES NOT alter self.x and self.dx during execution, allowing you to use this class multiple times without reseting self.x and self.dx. I'm not sure how often you would need to do that, but the capability is there. More exactly, none of the class member variables are modified by this function, in distinct contrast to update(), which changes most of them.

Parameters:

Name Type Description Default
data list like

contains the data to be filtered.

required
save_predictions boolean

the predictions will be saved and returned if this is true

False
saver Saver

bayesian_filters.common.Saver object. If provided, saver.save() will be called after every epoch

None

Returns:

Name Type Description
results np.array shape (n+1, 2), where n=len(data)

contains the results of the filter, where results[i,0] is x , and results[i,1] is dx (derivative of x) First entry is the initial values of x and dx as set by init.

predictions np.array shape(n), optional

the predictions for each step in the filter. Only retured if save_predictions == True

Source code in bayesian_filters/gh/gh_filter.py
def batch_filter(self, data, save_predictions=False, saver=None):
    """
    Given a sequenced list of data, performs g-h filter
    with a fixed g and h. See update() if you need to vary g and/or h.

    Uses self.x and self.dx to initialize the filter, but DOES NOT
    alter self.x and self.dx during execution, allowing you to use this
    class multiple times without reseting self.x and self.dx. I'm not sure
    how often you would need to do that, but the capability is there.
    More exactly, none of the class member variables are modified
    by this function, in distinct contrast to update(), which changes
    most of them.

    Parameters
    ----------

    data : list like
        contains the data to be filtered.

    save_predictions : boolean
        the predictions will be saved and returned if this is true

    saver : bayesian_filters.common.Saver, optional
        bayesian_filters.common.Saver object. If provided, saver.save() will be
        called after every epoch


    Returns
    -------

    results : np.array shape (n+1, 2), where n=len(data)
        contains the results of the filter, where
        results[i,0] is x , and
        results[i,1] is dx (derivative of x)
        First entry is the initial values of x and dx as set by __init__.

    predictions : np.array shape(n), optional
        the predictions for each step in the filter. Only retured if
        save_predictions == True
    """

    x = self.x
    dx = self.dx
    n = len(data)

    results = np.zeros((n + 1, 2))
    results[0, 0] = x
    results[0, 1] = dx

    if save_predictions:
        predictions = np.zeros(n)

    # optimization to avoid n computations of h / dt
    h_dt = self.h / self.dt

    for i, z in enumerate(data):
        # prediction step
        x_est = x + (dx * self.dt)

        # update step
        residual = z - x_est
        dx = dx + h_dt * residual  # i.e. dx = dx + h * residual / dt
        x = x_est + self.g * residual

        results[i + 1, 0] = x
        results[i + 1, 1] = dx
        if save_predictions:
            predictions[i] = x_est

        if saver is not None:
            saver.save()

    if save_predictions:
        return results, predictions

    return results

update(z, g=None, h=None)

performs the g-h filter predict and update step on the measurement z. Modifies the member variables listed below, and returns the state of x and dx as a tuple as a convienence.

Modified Members

x filtered state variable

dx derivative (velocity) of x

residual difference between the measurement and the prediction for x

x_prediction predicted value of x before incorporating the measurement z.

dx_prediction predicted value of the derivative of x before incorporating the measurement z.

Parameters:

Name Type Description Default
z any

the measurement

required
g scalar(optional)

Override the fixed self.g value for this update

None
h scalar(optional)

Override the fixed self.h value for this update

None

Returns:

Type Description
x filter output for x
dx filter output for dx (derivative of x
Source code in bayesian_filters/gh/gh_filter.py
def update(self, z, g=None, h=None):
    """
    performs the g-h filter predict and update step on the
    measurement z. Modifies the member variables listed below,
    and returns the state of x and dx as a tuple as a convienence.

    **Modified Members**

    x
        filtered state variable

    dx
        derivative (velocity) of x

    residual
        difference between the measurement and the prediction for x

    x_prediction
        predicted value of x before incorporating the measurement z.

    dx_prediction
        predicted value of the derivative of x before incorporating the
        measurement z.

    Parameters
    ----------

    z : any
        the measurement
    g : scalar (optional)
        Override the fixed self.g value for this update
    h : scalar (optional)
        Override the fixed self.h value for this update

    Returns
    -------

    x filter output for x
    dx filter output for dx (derivative of x
    """

    if g is None:
        g = self.g
    if h is None:
        h = self.h

    # prediction step
    self.dx_prediction = self.dx
    self.x_prediction = self.x + (self.dx * self.dt)

    # update step
    self.y = z - self.x_prediction
    self.dx = self.dx_prediction + h * self.y / self.dt
    self.x = self.x_prediction + g * self.y

    return (self.x, self.dx)