Skip to content

Kalman Filter

KalmanFilter

Implements a linear Kalman filter. For now the best documentation is my free book Kalman and Bayesian Filters in Python [2]

The test files in this directory also give you a basic idea of use, albeit without much description.

In brief, you will first construct this object, specifying the size of the state vector with dim_x and the size of the measurement vector that you will be using with dim_z. These are mostly used to perform size checks when you assign values to the various matrices. For example, if you specified dim_z=2 and then try to assign a 3x3 matrix to R (the measurement noise matrix you will get an assert exception because R should be 2x2. (If for whatever reason you need to alter the size of things midstream just use the underscore version of the matrices to assign directly: your_filter._R = a_3x3_matrix.)

After construction the filter will have default matrices created for you, but you must specify the values for each. It's usually easiest to just overwrite them rather than assign to each element yourself. This will be clearer in the example below. All are of type numpy.array.

These are the matrices (instance variables) which you must specify. All are of type numpy.array (do NOT use numpy.matrix) If dimensional analysis allows you to get away with a 1x1 matrix you may also use a scalar. All elements must have a type of float.

Instance Variables

You will have to assign reasonable values to all of these before running the filter. All must have dtype of float.

x : ndarray (dim_x, 1), default = [0,0,0...0] filter state estimate

P : ndarray (dim_x, dim_x), default eye(dim_x) covariance matrix

Q : ndarray (dim_x, dim_x), default eye(dim_x) Process uncertainty/noise

R : ndarray (dim_z, dim_z), default eye(dim_z) measurement uncertainty/noise

H : ndarray (dim_z, dim_x) measurement function

F : ndarray (dim_x, dim_x) state transition matrix

B : ndarray (dim_x, dim_u), default 0 control transition matrix

Optional Instance Variables

alpha : float

Assign a value > 1.0 to turn this into a fading memory filter.

Read-only Instance Variables

K : ndarray Kalman gain that was used in the most recent update() call.

y : ndarray Residual calculated in the most recent update() call. I.e., the different between the measurement and the current estimated state projected into measurement space (z - Hx)

S : ndarray System uncertainty projected into measurement space. I.e., HPH' + R. Probably not very useful, but it is here if you want it.

likelihood : float Likelihood of last measurment update.

log_likelihood : float Log likelihood of last measurment update.

Example

Here is a filter that tracks position and velocity using a sensor that only reads position.

First construct the object with the required dimensionality.

.. code`` from filterpy.kalman import KalmanFilter f = KalmanFilter (dim_x=2, dim_z=1)

``

Assign the initial value for the state (position and velocity). You can do this with a two dimensional array like so:

.. code`` f.x = np.array([[2.],# position [0.]]) # velocity

``

or just use a one dimensional array, which I prefer doing.

.. code`` f.x = np.array([2., 0.])

``

Define the state transition matrix:

.. code`` f.F = np.array([[1.,1.], [0.,1.]])

``

Define the measurement function:

.. code`` f.H = np.array([[1.,0.]])

``

Define the covariance matrix. Here I take advantage of the fact that P already contains np.eye(dim_x), and just multiply by the uncertainty:

.. code`` f.P *= 1000.

``

I could have written:

.. code`` f.P = np.array([[1000.,0.], [ 0., 1000.] ])

``

You decide which is more readable and understandable.

Now assign the measurement noise. Here the dimension is 1x1, so I can use a scalar

.. code`` f.R = 5

``

I could have done this instead:

.. code`` f.R = np.array([[5.]])

``

Note that this must be a 2 dimensional array, as must all the matrices.

Finally, I will assign the process noise. Here I will take advantage of another FilterPy library function:

.. code`` from filterpy.common import Q_discrete_white_noise f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13)

``

Now just perform the standard predict/update loop:

while some_condition_is_true:

.. code`` z = get_sensor_reading() f.predict() f.update(z)

``

do_something_with_estimate (f.x)

Procedural Form

This module also contains stand alone functions to perform Kalman filtering. Use these if you are not a fan of objects.

Example

.. code`` while True: z, R = read_sensor() x, P = predict(x, P, F, Q) x, P = update(x, P, z, R, H)

``

References

.. [2] Labbe, Roger. "Kalman and Bayesian Filters in Python".

github repo: https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

read online: http://nbviewer.ipython.org/github/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/table_of_contents.ipynb


API Reference

KalmanFilter

Bases: object

Implements a Kalman filter. You are responsible for setting the various state variables to reasonable values; the defaults will not give you a functional filter.

For now the best documentation is my free book Kalman and Bayesian Filters in Python [2]_. The test files in this directory also give you a basic idea of use, albeit without much description.

In brief, you will first construct this object, specifying the size of the state vector with dim_x and the size of the measurement vector that you will be using with dim_z. These are mostly used to perform size checks when you assign values to the various matrices. For example, if you specified dim_z=2 and then try to assign a 3x3 matrix to R (the measurement noise matrix you will get an assert exception because R should be 2x2. (If for whatever reason you need to alter the size of things midstream just use the underscore version of the matrices to assign directly: your_filter._R = a_3x3_matrix.)

After construction the filter will have default matrices created for you, but you must specify the values for each. It’s usually easiest to just overwrite them rather than assign to each element yourself. This will be clearer in the example below. All are of type numpy.array.

Examples:

Here is a filter that tracks position and velocity using a sensor that only reads position.

First construct the object with the required dimensionality. Here the state (dim_x) has 2 coefficients (position and velocity), and the measurement (dim_z) has one. In FilterPy x is the state, z is the measurement.

.. code::

from bayesian_filters.kalman import KalmanFilter
f = KalmanFilter (dim_x=2, dim_z=1)

Assign the initial value for the state (position and velocity). You can do this with a two dimensional array like so:

.. code::

    f.x = np.array([[2.],    # position
                    [0.]])   # velocity

or just use a one dimensional array, which I prefer doing.

.. code::

f.x = np.array([2., 0.])

Define the state transition matrix:

.. code::

    f.F = np.array([[1.,1.],
                    [0.,1.]])

Define the measurement function. Here we need to convert a position-velocity vector into just a position vector, so we use:

.. code::

f.H = np.array([[1., 0.]])

Define the state's covariance matrix P.

.. code::

f.P = np.array([[1000.,    0.],
                [   0., 1000.] ])

Now assign the measurement noise. Here the dimension is 1x1, so I can use a scalar

.. code::

f.R = 5

I could have done this instead:

.. code::

f.R = np.array([[5.]])

Note that this must be a 2 dimensional array.

Finally, I will assign the process noise. Here I will take advantage of another FilterPy library function:

.. code::

from bayesian_filters.common import Q_discrete_white_noise
f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13)

Now just perform the standard predict/update loop:

.. code::

while some_condition_is_true:
    z = get_sensor_reading()
    f.predict()
    f.update(z)

    do_something_with_estimate (f.x)

Procedural Form

This module also contains stand alone functions to perform Kalman filtering. Use these if you are not a fan of objects.

Example

.. code::

while True:
    z, R = read_sensor()
    x, P = predict(x, P, F, Q)
    x, P = update(x, P, z, R, H)

See my book Kalman and Bayesian Filters in Python [2]_.

You will have to set the following attributes after constructing this object for the filter to perform properly. Please note that there are various checks in place to ensure that you have made everything the 'correct' size. However, it is possible to provide incorrectly sized arrays such that the linear algebra can not perform an operation. It can also fail silently - you can end up with matrices of a size that allows the linear algebra to work, but are the wrong shape for the problem you are trying to solve.

Parameters:

Name Type Description Default
dim_x int

Number of state variables for the Kalman filter. For example, if you are tracking the position and velocity of an object in two dimensions, dim_x would be 4. This is used to set the default size of P, Q, and u

required
dim_z int

Number of of measurement inputs. For example, if the sensor provides you with position in (x,y), dim_z would be 2.

required
dim_u int(optional)

size of the control input, if it is being used. Default value of 0 indicates it is not used.

0

Attributes:

Name Type Description
x array(dim_x, 1)

Current state estimate. Any call to update() or predict() updates this variable.

P array(dim_x, dim_x)

Current state covariance matrix. Any call to update() or predict() updates this variable.

x_prior array(dim_x, 1)

Prior (predicted) state estimate. The _prior and _post attributes are for convenience; they store the prior and posterior of the current epoch. Read Only.

P_prior array(dim_x, dim_x)

Prior (predicted) state covariance matrix. Read Only.

x_post array(dim_x, 1)

Posterior (updated) state estimate. Read Only.

P_post array(dim_x, dim_x)

Posterior (updated) state covariance matrix. Read Only.

z array

Last measurement used in update(). Read only.

R array(dim_z, dim_z)

Measurement noise covariance matrix. Also known as the observation covariance.

Q array(dim_x, dim_x)

Process noise covariance matrix. Also known as the transition covariance.

F array()

State Transition matrix. Also known as A in some formulation.

H array(dim_z, dim_x)

Measurement function. Also known as the observation matrix, or as C.

y array

Residual of the update step. Read only.

K array(dim_x, dim_z)

Kalman gain of the update step. Read only.

S array

System uncertainty (P projected to measurement space). Read only.

SI array

Inverse system uncertainty. Read only.

log_likelihood float

log-likelihood of the last measurement. Read only.

likelihood float

likelihood of last measurement. Read only.

Computed from the log-likelihood. The log-likelihood can be very small, meaning a large negative value such as -28000. Taking the exp() of that results in 0.0, which can break typical algorithms which multiply by this value, so by default we always return a number >= sys.float_info.min.

mahalanobis float

mahalanobis distance of the innovation. Read only.

inv function, default numpy.linalg.inv

If you prefer another inverse function, such as the Moore-Penrose pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

This is only used to invert self.S. If you know it is diagonal, you might choose to set it to bayesian_filters.common.inv_diagonal, which is several times faster than numpy.linalg.inv for diagonal matrices.

alpha float

Fading memory setting. 1.0 gives the normal Kalman filter, and values slightly larger than 1.0 (such as 1.02) give a fading memory effect - previous measurements have less influence on the filter's estimates. This formulation of the Fading memory filter (there are many) is due to Dan Simon [1]_.

References

.. [1] Dan Simon. "Optimal State Estimation." John Wiley & Sons. p. 208-212. (2006)

.. [2] Roger Labbe. "Kalman and Bayesian Filters in Python" https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

Source code in bayesian_filters/kalman/kalman_filter.py
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
class KalmanFilter(object):
    r"""Implements a Kalman filter. You are responsible for setting the
    various state variables to reasonable values; the defaults  will
    not give you a functional filter.

    For now the best documentation is my free book Kalman and Bayesian
    Filters in Python [2]_. The test files in this directory also give you a
    basic idea of use, albeit without much description.

    In brief, you will first construct this object, specifying the size of
    the state vector with dim_x and the size of the measurement vector that
    you will be using with dim_z. These are mostly used to perform size checks
    when you assign values to the various matrices. For example, if you
    specified dim_z=2 and then try to assign a 3x3 matrix to R (the
    measurement noise matrix you will get an assert exception because R
    should be 2x2. (If for whatever reason you need to alter the size of
    things midstream just use the underscore version of the matrices to
    assign directly: your_filter._R = a_3x3_matrix.)

    After construction the filter will have default matrices created for you,
    but you must specify the values for each. It’s usually easiest to just
    overwrite them rather than assign to each element yourself. This will be
    clearer in the example below. All are of type numpy.array.


    Examples
    --------

    Here is a filter that tracks position and velocity using a sensor that only
    reads position.

    First construct the object with the required dimensionality. Here the state
    (`dim_x`) has 2 coefficients (position and velocity), and the measurement
    (`dim_z`) has one. In FilterPy `x` is the state, `z` is the measurement.

    .. code::

        from bayesian_filters.kalman import KalmanFilter
        f = KalmanFilter (dim_x=2, dim_z=1)


    Assign the initial value for the state (position and velocity). You can do this
    with a two dimensional array like so:

        .. code::

            f.x = np.array([[2.],    # position
                            [0.]])   # velocity

    or just use a one dimensional array, which I prefer doing.

    .. code::

        f.x = np.array([2., 0.])


    Define the state transition matrix:

        .. code::

            f.F = np.array([[1.,1.],
                            [0.,1.]])

    Define the measurement function. Here we need to convert a position-velocity
    vector into just a position vector, so we use:

        .. code::

        f.H = np.array([[1., 0.]])

    Define the state's covariance matrix P.

    .. code::

        f.P = np.array([[1000.,    0.],
                        [   0., 1000.] ])

    Now assign the measurement noise. Here the dimension is 1x1, so I can
    use a scalar

    .. code::

        f.R = 5

    I could have done this instead:

    .. code::

        f.R = np.array([[5.]])

    Note that this must be a 2 dimensional array.

    Finally, I will assign the process noise. Here I will take advantage of
    another FilterPy library function:

    .. code::

        from bayesian_filters.common import Q_discrete_white_noise
        f.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.13)


    Now just perform the standard predict/update loop:

    .. code::

        while some_condition_is_true:
            z = get_sensor_reading()
            f.predict()
            f.update(z)

            do_something_with_estimate (f.x)


    **Procedural Form**

    This module also contains stand alone functions to perform Kalman filtering.
    Use these if you are not a fan of objects.

    **Example**

    .. code::

        while True:
            z, R = read_sensor()
            x, P = predict(x, P, F, Q)
            x, P = update(x, P, z, R, H)

    See my book Kalman and Bayesian Filters in Python [2]_.


    You will have to set the following attributes after constructing this
    object for the filter to perform properly. Please note that there are
    various checks in place to ensure that you have made everything the
    'correct' size. However, it is possible to provide incorrectly sized
    arrays such that the linear algebra can not perform an operation.
    It can also fail silently - you can end up with matrices of a size that
    allows the linear algebra to work, but are the wrong shape for the problem
    you are trying to solve.

    Parameters
    ----------
    dim_x : int
        Number of state variables for the Kalman filter. For example, if
        you are tracking the position and velocity of an object in two
        dimensions, dim_x would be 4.
        This is used to set the default size of P, Q, and u

    dim_z : int
        Number of of measurement inputs. For example, if the sensor
        provides you with position in (x,y), dim_z would be 2.

    dim_u : int (optional)
        size of the control input, if it is being used.
        Default value of 0 indicates it is not used.


    Attributes
    ----------
    x : numpy.array(dim_x, 1)
        Current state estimate. Any call to update() or predict() updates
        this variable.

    P : numpy.array(dim_x, dim_x)
        Current state covariance matrix. Any call to update() or predict()
        updates this variable.

    x_prior : numpy.array(dim_x, 1)
        Prior (predicted) state estimate. The *_prior and *_post attributes
        are for convenience; they store the  prior and posterior of the
        current epoch. Read Only.

    P_prior : numpy.array(dim_x, dim_x)
        Prior (predicted) state covariance matrix. Read Only.

    x_post : numpy.array(dim_x, 1)
        Posterior (updated) state estimate. Read Only.

    P_post : numpy.array(dim_x, dim_x)
        Posterior (updated) state covariance matrix. Read Only.

    z : numpy.array
        Last measurement used in update(). Read only.

    R : numpy.array(dim_z, dim_z)
        Measurement noise covariance matrix. Also known as the
        observation covariance.

    Q : numpy.array(dim_x, dim_x)
        Process noise covariance matrix. Also known as the transition
        covariance.

    F : numpy.array()
        State Transition matrix. Also known as `A` in some formulation.

    H : numpy.array(dim_z, dim_x)
        Measurement function. Also known as the observation matrix, or as `C`.

    y : numpy.array
        Residual of the update step. Read only.

    K : numpy.array(dim_x, dim_z)
        Kalman gain of the update step. Read only.

    S :  numpy.array
        System uncertainty (P projected to measurement space). Read only.

    SI :  numpy.array
        Inverse system uncertainty. Read only.

    log_likelihood : float
        log-likelihood of the last measurement. Read only.

    likelihood : float
        likelihood of last measurement. Read only.

        Computed from the log-likelihood. The log-likelihood can be very
        small,  meaning a large negative value such as -28000. Taking the
        exp() of that results in 0.0, which can break typical algorithms
        which multiply by this value, so by default we always return a
        number >= sys.float_info.min.

    mahalanobis : float
        mahalanobis distance of the innovation. Read only.

    inv : function, default numpy.linalg.inv
        If you prefer another inverse function, such as the Moore-Penrose
        pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

        This is only used to invert self.S. If you know it is diagonal, you
        might choose to set it to bayesian_filters.common.inv_diagonal, which is
        several times faster than numpy.linalg.inv for diagonal matrices.

    alpha : float
        Fading memory setting. 1.0 gives the normal Kalman filter, and
        values slightly larger than 1.0 (such as 1.02) give a fading
        memory effect - previous measurements have less influence on the
        filter's estimates. This formulation of the Fading memory filter
        (there are many) is due to Dan Simon [1]_.

    References
    ----------

    .. [1] Dan Simon. "Optimal State Estimation." John Wiley & Sons.
       p. 208-212. (2006)

    .. [2] Roger Labbe. "Kalman and Bayesian Filters in Python"
       https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

    """

    def __init__(self, dim_x, dim_z, dim_u=0):
        if dim_x < 1:
            raise ValueError("dim_x must be 1 or greater")
        if dim_z < 1:
            raise ValueError("dim_z must be 1 or greater")
        if dim_u < 0:
            raise ValueError("dim_u must be 0 or greater")

        self.dim_x = dim_x
        self.dim_z = dim_z
        self.dim_u = dim_u

        self.x = zeros((dim_x, 1))  # state
        self.P = eye(dim_x)  # uncertainty covariance
        self.Q = eye(dim_x)  # process uncertainty
        self.B = None  # control transition matrix
        self.F = eye(dim_x)  # state transition matrix
        self.H = zeros((dim_z, dim_x))  # measurement function
        self.R = eye(dim_z)  # measurement uncertainty
        self._alpha_sq = 1.0  # fading memory control
        self.M = np.zeros((dim_x, dim_z))  # process-measurement cross correlation
        self.z = np.array([[None] * self.dim_z]).T

        # gain and residual are computed during the innovation step. We
        # save them so that in case you want to inspect them for various
        # purposes
        self.K = np.zeros((dim_x, dim_z))  # kalman gain
        self.y = zeros((dim_z, 1))
        self.S = np.zeros((dim_z, dim_z))  # system uncertainty
        self.SI = np.zeros((dim_z, dim_z))  # inverse system uncertainty

        # identity matrix. Do not alter this.
        self._I = np.eye(dim_x)

        # these will always be a copy of x,P after predict() is called
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

        # these will always be a copy of x,P after update() is called
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

        # Only computed only if requested via property
        self._log_likelihood = log(sys.float_info.min)
        self._likelihood = sys.float_info.min
        self._mahalanobis = None

        self.inv = np.linalg.inv

    def predict(self, u=None, B=None, F=None, Q=None):
        """
        Predict next state (prior) using the Kalman filter state propagation
        equations.

        Parameters
        ----------

        u : np.array, default 0
            Optional control vector.

        B : np.array(dim_x, dim_u), or None
            Optional control transition matrix; a value of None
            will cause the filter to use `self.B`.

        F : np.array(dim_x, dim_x), or None
            Optional state transition matrix; a value of None
            will cause the filter to use `self.F`.

        Q : np.array(dim_x, dim_x), scalar, or None
            Optional process noise matrix; a value of None will cause the
            filter to use `self.Q`.
        """

        if B is None:
            B = self.B
        if F is None:
            F = self.F
        if Q is None:
            Q = self.Q
        elif isscalar(Q):
            Q = eye(self.dim_x) * Q

        # x = Fx + Bu
        if B is not None and u is not None:
            self.x = dot(F, self.x) + dot(B, u)
        else:
            self.x = dot(F, self.x)

        # P = FPF' + Q
        self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q

        # save prior
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

    def update(self, z, R=None, H=None):
        """
        Add a new measurement (z) to the Kalman filter.

        If z is None, nothing is computed. However, x_post and P_post are
        updated with the prior (x_prior, P_prior), and self.z is set to None.

        Parameters
        ----------
        z : (dim_z, 1): array_like
            measurement for this update. z can be a scalar if dim_z is 1,
            otherwise it must be convertible to a column vector.

            If you pass in a value of H, z must be a column vector the
            of the correct size.

        R : np.array, scalar, or None
            Optionally provide R to override the measurement noise for this
            one call, otherwise  self.R will be used.

        H : np.array, or None
            Optionally provide H to override the measurement function for this
            one call, otherwise self.H will be used.
        """

        # set to None to force recompute
        self._log_likelihood = None
        self._likelihood = None
        self._mahalanobis = None

        if z is None:
            self.z = np.array([[None] * self.dim_z]).T
            self.x_post = self.x.copy()
            self.P_post = self.P.copy()
            self.y = zeros((self.dim_z, 1))
            return

        if R is None:
            R = self.R
        elif isscalar(R):
            R = eye(self.dim_z) * R

        if H is None:
            z = reshape_z(z, self.dim_z, self.x.ndim)
            H = self.H

        # y = z - Hx
        # error (residual) between measurement and prediction
        self.y = z - dot(H, self.x)

        # common subexpression for speed
        PHT = dot(self.P, H.T)

        # S = HPH' + R
        # project system uncertainty into measurement space
        self.S = dot(H, PHT) + R
        self.SI = self.inv(self.S)
        # K = PH'inv(S)
        # map system uncertainty into kalman gain
        self.K = dot(PHT, self.SI)

        # x = x + Ky
        # predict new x with residual scaled by the kalman gain
        self.x = self.x + dot(self.K, self.y)

        # P = (I-KH)P(I-KH)' + KRK'
        # This is more numerically stable
        # and works for non-optimal K vs the equation
        # P = (I-KH)P usually seen in the literature.

        I_KH = self._I - dot(self.K, H)
        self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T)

        # save measurement and posterior state
        self.z = deepcopy(z)
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

    def predict_steadystate(self, u=0, B=None):
        """
        Predict state (prior) using the Kalman filter state propagation
        equations. Only x is updated, P is left unchanged. See
        update_steadstate() for a longer explanation of when to use this
        method.

        Parameters
        ----------

        u : np.array
            Optional control vector. If non-zero, it is multiplied by B
            to create the control input into the system.

        B : np.array(dim_x, dim_u), or None
            Optional control transition matrix; a value of None
            will cause the filter to use `self.B`.
        """

        if B is None:
            B = self.B

        # x = Fx + Bu
        if B is not None:
            self.x = dot(self.F, self.x) + dot(B, u)
        else:
            self.x = dot(self.F, self.x)

        # save prior
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

    def update_steadystate(self, z):
        """
        Add a new measurement (z) to the Kalman filter without recomputing
        the Kalman gain K, the state covariance P, or the system
        uncertainty S.

        You can use this for LTI systems since the Kalman gain and covariance
        converge to a fixed value. Precompute these and assign them explicitly,
        or run the Kalman filter using the normal predict()/update(0 cycle
        until they converge.

        The main advantage of this call is speed. We do significantly less
        computation, notably avoiding a costly matrix inversion.

        Use in conjunction with predict_steadystate(), otherwise P will grow
        without bound.

        Parameters
        ----------
        z : (dim_z, 1): array_like
            measurement for this update. z can be a scalar if dim_z is 1,
            otherwise it must be convertible to a column vector.


        Examples
        --------
        >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
        >>> # let filter converge on representative data, then save k and P
        >>> for i in range(100):
        >>>     cv.predict()
        >>>     cv.update([i, i, i])
        >>> saved_k = np.copy(cv.K)
        >>> saved_P = np.copy(cv.P)

        later on:

        >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
        >>> cv.K = np.copy(saved_K)
        >>> cv.P = np.copy(saved_P)
        >>> for i in range(100):
        >>>     cv.predict_steadystate()
        >>>     cv.update_steadystate([i, i, i])
        """

        # set to None to force recompute
        self._log_likelihood = None
        self._likelihood = None
        self._mahalanobis = None

        if z is None:
            self.z = np.array([[None] * self.dim_z]).T
            self.x_post = self.x.copy()
            self.P_post = self.P.copy()
            self.y = zeros((self.dim_z, 1))
            return

        z = reshape_z(z, self.dim_z, self.x.ndim)

        # y = z - Hx
        # error (residual) between measurement and prediction
        self.y = z - dot(self.H, self.x)

        # x = x + Ky
        # predict new x with residual scaled by the kalman gain
        self.x = self.x + dot(self.K, self.y)

        self.z = deepcopy(z)
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

        # set to None to force recompute
        self._log_likelihood = None
        self._likelihood = None
        self._mahalanobis = None

    def update_correlated(self, z, R=None, H=None):
        """Add a new measurement (z) to the Kalman filter assuming that
        process noise and measurement noise are correlated as defined in
        the `self.M` matrix.

        A partial derivation can be found in [1]

        If z is None, nothing is changed.

        Parameters
        ----------
        z : (dim_z, 1): array_like
            measurement for this update. z can be a scalar if dim_z is 1,
            otherwise it must be convertible to a column vector.

        R : np.array, scalar, or None
            Optionally provide R to override the measurement noise for this
            one call, otherwise  self.R will be used.

        H : np.array,  or None
            Optionally provide H to override the measurement function for this
            one call, otherwise  self.H will be used.

        References
        ----------

        .. [1] Bulut, Y. (2011). Applied Kalman filter theory (Doctoral dissertation, Northeastern University).
               http://people.duke.edu/~hpgavin/SystemID/References/Balut-KalmanFilter-PhD-NEU-2011.pdf
        """

        # set to None to force recompute
        self._log_likelihood = None
        self._likelihood = None
        self._mahalanobis = None

        if z is None:
            self.z = np.array([[None] * self.dim_z]).T
            self.x_post = self.x.copy()
            self.P_post = self.P.copy()
            self.y = zeros((self.dim_z, 1))
            return

        if R is None:
            R = self.R
        elif isscalar(R):
            R = eye(self.dim_z) * R

        # rename for readability and a tiny extra bit of speed
        if H is None:
            z = reshape_z(z, self.dim_z, self.x.ndim)
            H = self.H

        # handle special case: if z is in form [[z]] but x is not a column
        # vector dimensions will not match
        if self.x.ndim == 1 and shape(z) == (1, 1):
            z = z[0]

        if shape(z) == ():  # is it scalar, e.g. z=3 or z=np.array(3)
            z = np.asarray([z])

        # y = z - Hx
        # error (residual) between measurement and prediction
        self.y = z - dot(H, self.x)

        # common subexpression for speed
        PHT = dot(self.P, H.T)

        # project system uncertainty into measurement space
        self.S = dot(H, PHT) + dot(H, self.M) + dot(self.M.T, H.T) + R
        self.SI = self.inv(self.S)

        # K = PH'inv(S)
        # map system uncertainty into kalman gain
        self.K = dot(PHT + self.M, self.SI)

        # x = x + Ky
        # predict new x with residual scaled by the kalman gain
        self.x = self.x + dot(self.K, self.y)
        self.P = self.P - dot(self.K, dot(H, self.P) + self.M.T)

        self.z = deepcopy(z)
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

    def update_sequential(self, start, z_i, R_i=None, H_i=None):
        """
        Add a single input measurement (z_i) to the Kalman filter.
        In sequential processing, inputs are processed one at a time.

        Parameters
        ----------
        start : integer
            Index of the first measurement input updated by this call.

        z_i : np.array or scalar
            Measurement of inputs for this partial update.

        R_i : np.array, scalar, or None
            Optionally provide R_i to override the measurement noise of
            inputs for this one call, otherwise a slice of self.R will
            be used.

        H_i : np.array, or None
            Optionally provide H[i] to override the partial measurement
            function for this one call, otherwise a slice of self.H will
            be used.
        """

        if isscalar(z_i):
            length = 1
        else:
            length = len(z_i)
        z_i = np.reshape(z_i, [length, 1])
        stop = start + length

        if R_i is None:
            R_i = self.R[start:stop, start:stop]
        elif isscalar(R_i):
            R_i = eye(length) * R_i

        if H_i is None:
            H_i = self.H[start:stop]

        H_i = np.reshape(H_i, [length, self.dim_x])

        # y_i = z_i - H_i @ x
        # error (residual) between measurement and prediction
        y_i = z_i - dot(H_i, self.x)
        self.y[start:stop] = y_i

        # common subexpression for speed
        PHT = dot(self.P, H_i.T)

        # project system uncertainty into the measurement subspace
        S_i = dot(H_i, PHT) + R_i

        if length == 1:
            K_i = PHT * (1.0 / S_i)
        else:
            K_i = dot(PHT, linalg.inv(S_i))

        self.K[:, start:stop] = K_i
        I_KH = self._I - np.dot(K_i, H_i)

        # x = x + K_i @ y_i
        # update state estimation with residual scaled by the kalman gain
        self.x += dot(K_i, y_i)

        # compute the posterior covariance
        self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(K_i, R_i), K_i.T)

        # save measurement component #i and the posterior state
        self.z[start:stop] = z_i
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

    def batch_filter(
        self,
        zs,
        Fs=None,
        Qs=None,
        Hs=None,
        Rs=None,
        Bs=None,
        us=None,
        update_first=False,
        saver=None,
    ):
        """Batch processes a sequences of measurements.

         Parameters
         ----------

         zs : list-like
             list of measurements at each time step `self.dt`. Missing
             measurements must be represented by `None`.

         Fs : None, list-like, default=None
             optional value or list of values to use for the state transition
             matrix F.

             If Fs is None then self.F is used for all epochs.

             Otherwise it must contain a list-like list of F's, one for
             each epoch.  This allows you to have varying F per epoch.

         Qs : None, np.array or list-like, default=None
             optional value or list of values to use for the process error
             covariance Q.

             If Qs is None then self.Q is used for all epochs.

             Otherwise it must contain a list-like list of Q's, one for
             each epoch.  This allows you to have varying Q per epoch.

         Hs : None, np.array or list-like, default=None
             optional list of values to use for the measurement matrix H.

             If Hs is None then self.H is used for all epochs.

             If Hs contains a single matrix, then it is used as H for all
             epochs.

             Otherwise it must contain a list-like list of H's, one for
             each epoch.  This allows you to have varying H per epoch.

         Rs : None, np.array or list-like, default=None
             optional list of values to use for the measurement error
             covariance R.

             If Rs is None then self.R is used for all epochs.

             Otherwise it must contain a list-like list of R's, one for
             each epoch.  This allows you to have varying R per epoch.

         Bs : None, np.array or list-like, default=None
             optional list of values to use for the control transition matrix B.

             If Bs is None then self.B is used for all epochs.

             Otherwise it must contain a list-like list of B's, one for
             each epoch.  This allows you to have varying B per epoch.

         us : None, np.array or list-like, default=None
             optional list of values to use for the control input vector;

             If us is None then None is used for all epochs (equivalent to 0,
             or no control input).

             Otherwise it must contain a list-like list of u's, one for
             each epoch.

        update_first : bool, optional, default=False
             controls whether the order of operations is update followed by
             predict, or predict followed by update. Default is predict->update.

         saver : bayesian_filters.common.Saver, optional
             bayesian_filters.common.Saver object. If provided, saver.save() will be
             called after every epoch

         Returns
         -------

         means : np.array((n,dim_x,1))
             array of the state for each time step after the update. Each entry
             is an np.array. In other words `means[k,:]` is the state at step
             `k`.

         covariance : np.array((n,dim_x,dim_x))
             array of the covariances for each time step after the update.
             In other words `covariance[k,:,:]` is the covariance at step `k`.

         means_predictions : np.array((n,dim_x,1))
             array of the state for each time step after the predictions. Each
             entry is an np.array. In other words `means[k,:]` is the state at
             step `k`.

         covariance_predictions : np.array((n,dim_x,dim_x))
             array of the covariances for each time step after the prediction.
             In other words `covariance[k,:,:]` is the covariance at step `k`.

         Examples
         --------

         .. code-block:: Python

             # this example demonstrates tracking a measurement where the time
             # between measurement varies, as stored in dts. This requires
             # that F be recomputed for each epoch. The output is then smoothed
             # with an RTS smoother.

             zs = [t + random.randn()*4 for t in range (40)]
             Fs = [np.array([[1., dt], [0, 1]] for dt in dts]

             (mu, cov, _, _) = kf.batch_filter(zs, Fs=Fs)
             (xs, Ps, Ks, Pps) = kf.rts_smoother(mu, cov, Fs=Fs)
        """

        # pylint: disable=too-many-statements
        n = np.size(zs, 0)
        if Fs is None:
            Fs = [self.F] * n
        if Qs is None:
            Qs = [self.Q] * n
        if Hs is None:
            Hs = [self.H] * n
        if Rs is None:
            Rs = [self.R] * n
        if Bs is None:
            Bs = [self.B] * n
        if us is None:
            us = [0] * n

        # mean estimates from Kalman Filter
        if self.x.ndim == 1:
            means = zeros((n, self.dim_x))
            means_p = zeros((n, self.dim_x))
        else:
            means = zeros((n, self.dim_x, 1))
            means_p = zeros((n, self.dim_x, 1))

        # state covariances from Kalman Filter
        covariances = zeros((n, self.dim_x, self.dim_x))
        covariances_p = zeros((n, self.dim_x, self.dim_x))

        if update_first:
            for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):
                self.update(z, R=R, H=H)
                means[i, :] = self.x
                covariances[i, :, :] = self.P

                self.predict(u=u, B=B, F=F, Q=Q)
                means_p[i, :] = self.x
                covariances_p[i, :, :] = self.P

                if saver is not None:
                    saver.save()
        else:
            for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):
                self.predict(u=u, B=B, F=F, Q=Q)
                means_p[i, :] = self.x
                covariances_p[i, :, :] = self.P

                self.update(z, R=R, H=H)
                means[i, :] = self.x
                covariances[i, :, :] = self.P

                if saver is not None:
                    saver.save()

        return (means, covariances, means_p, covariances_p)

    def rts_smoother(self, Xs, Ps, Fs=None, Qs=None, inv=np.linalg.inv):
        """
        Runs the Rauch-Tung-Striebel Kalman smoother on a set of
        means and covariances computed by a Kalman filter. The usual input
        would come from the output of `KalmanFilter.batch_filter()`.

        Parameters
        ----------

        Xs : numpy.array
            array of the means (state variable x) of the output of a Kalman
            filter.

        Ps : numpy.array
            array of the covariances of the output of a kalman filter.

        Fs : list-like collection of numpy.array, optional
            State transition matrix of the Kalman filter at each time step.
            Optional, if not provided the filter's self.F will be used

        Qs : list-like collection of numpy.array, optional
            Process noise of the Kalman filter at each time step. Optional,
            if not provided the filter's self.Q will be used

        inv : function, default numpy.linalg.inv
            If you prefer another inverse function, such as the Moore-Penrose
            pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv


        Returns
        -------

        x : numpy.ndarray
            smoothed means

        P : numpy.ndarray
            smoothed state covariances

        K : numpy.ndarray
            smoother gain at each step

        Pp : numpy.ndarray
            Predicted state covariances

        Examples
        --------

        .. code-block:: Python

            zs = [t + random.randn()*4 for t in range (40)]

            (mu, cov, _, _) = kalman.batch_filter(zs)
            (x, P, K, Pp) = rts_smoother(mu, cov, kf.F, kf.Q)

        """

        if len(Xs) != len(Ps):
            raise ValueError("length of Xs and Ps must be the same")

        n = Xs.shape[0]
        dim_x = Xs.shape[1]

        if Fs is None:
            Fs = [self.F] * n
        if Qs is None:
            Qs = [self.Q] * n

        # smoother gain
        K = zeros((n, dim_x, dim_x))

        x, P, Pp = Xs.copy(), Ps.copy(), Ps.copy()
        for k in range(n - 2, -1, -1):
            Pp[k] = dot(dot(Fs[k + 1], P[k]), Fs[k + 1].T) + Qs[k + 1]

            # pylint: disable=bad-whitespace
            K[k] = dot(dot(P[k], Fs[k + 1].T), inv(Pp[k]))
            x[k] += dot(K[k], x[k + 1] - dot(Fs[k + 1], x[k]))
            P[k] += dot(dot(K[k], P[k + 1] - Pp[k]), K[k].T)

        return (x, P, K, Pp)

    def get_prediction(self, u=None, B=None, F=None, Q=None):
        """
        Predict next state (prior) using the Kalman filter state propagation
        equations and returns it without modifying the object.

        Parameters
        ----------

        u : np.array, default 0
            Optional control vector.

        B : np.array(dim_x, dim_u), or None
            Optional control transition matrix; a value of None
            will cause the filter to use `self.B`.

        F : np.array(dim_x, dim_x), or None
            Optional state transition matrix; a value of None
            will cause the filter to use `self.F`.

        Q : np.array(dim_x, dim_x), scalar, or None
            Optional process noise matrix; a value of None will cause the
            filter to use `self.Q`.

        Returns
        -------

        (x, P) : tuple
            State vector and covariance array of the prediction.
        """

        if B is None:
            B = self.B
        if F is None:
            F = self.F
        if Q is None:
            Q = self.Q
        elif isscalar(Q):
            Q = eye(self.dim_x) * Q

        # x = Fx + Bu
        if B is not None and u is not None:
            x = dot(F, self.x) + dot(B, u)
        else:
            x = dot(F, self.x)

        # P = FPF' + Q
        P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q

        return x, P

    def get_update(self, z=None):
        """
        Computes the new estimate based on measurement `z` and returns it
        without altering the state of the filter.

        Parameters
        ----------

        z : (dim_z, 1): array_like
            measurement for this update. z can be a scalar if dim_z is 1,
            otherwise it must be convertible to a column vector.

        Returns
        -------

        (x, P) : tuple
            State vector and covariance array of the update.
        """

        if z is None:
            return self.x, self.P
        z = reshape_z(z, self.dim_z, self.x.ndim)

        R = self.R
        H = self.H
        P = self.P
        x = self.x

        # error (residual) between measurement and prediction
        y = z - dot(H, x)

        # common subexpression for speed
        PHT = dot(P, H.T)

        # project system uncertainty into measurement space
        S = dot(H, PHT) + R

        # map system uncertainty into kalman gain
        K = dot(PHT, self.inv(S))

        # predict new x with residual scaled by the kalman gain
        x = x + dot(K, y)

        # P = (I-KH)P(I-KH)' + KRK'
        I_KH = self._I - dot(K, H)
        P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T)

        return x, P

    def residual_of(self, z):
        """
        Returns the residual for the given measurement (z). Does not alter
        the state of the filter.
        """
        z = reshape_z(z, self.dim_z, self.x.ndim)
        return z - dot(self.H, self.x_prior)

    def measurement_of_state(self, x):
        """
        Helper function that converts a state into a measurement.

        Parameters
        ----------

        x : np.array
            kalman state vector

        Returns
        -------

        z : (dim_z, 1): array_like
            measurement for this update. z can be a scalar if dim_z is 1,
            otherwise it must be convertible to a column vector.
        """

        return dot(self.H, x)

    @property
    def log_likelihood(self):
        """
        log-likelihood of the last measurement.
        """
        if self._log_likelihood is None:
            self._log_likelihood = logpdf(x=self.y, cov=self.S)
        return self._log_likelihood

    @property
    def likelihood(self):
        """
        Computed from the log-likelihood. The log-likelihood can be very
        small,  meaning a large negative value such as -28000. Taking the
        exp() of that results in 0.0, which can break typical algorithms
        which multiply by this value, so by default we always return a
        number >= sys.float_info.min.
        """
        if self._likelihood is None:
            self._likelihood = exp(self.log_likelihood)
            if self._likelihood == 0:
                self._likelihood = sys.float_info.min
        return self._likelihood

    @property
    def mahalanobis(self):
        """ "
        Mahalanobis distance of measurement. E.g. 3 means measurement
        was 3 standard deviations away from the predicted value.

        Returns
        -------
        mahalanobis : float
        """
        if self._mahalanobis is None:
            self._mahalanobis = sqrt(dot(dot(self.y.T, self.SI), self.y).item())
        return self._mahalanobis

    @property
    def alpha(self):
        """
        Fading memory setting. 1.0 gives the normal Kalman filter, and
        values slightly larger than 1.0 (such as 1.02) give a fading
        memory effect - previous measurements have less influence on the
        filter's estimates. This formulation of the Fading memory filter
        (there are many) is due to Dan Simon [1]_.
        """
        return self._alpha_sq**0.5

    def log_likelihood_of(self, z):
        """
        log likelihood of the measurement `z`. This should only be called
        after a call to update(). Calling after predict() will yield an
        incorrect result."""

        if z is None:
            return log(sys.float_info.min)
        return logpdf(z, dot(self.H, self.x), self.S)

    @alpha.setter
    def alpha(self, value):
        if not np.isscalar(value) or value < 1:
            raise ValueError("alpha must be a float greater than 1")

        self._alpha_sq = value**2

    def __repr__(self):
        return "\n".join(
            [
                "KalmanFilter object",
                pretty_str("dim_x", self.dim_x),
                pretty_str("dim_z", self.dim_z),
                pretty_str("dim_u", self.dim_u),
                pretty_str("x", self.x),
                pretty_str("P", self.P),
                pretty_str("x_prior", self.x_prior),
                pretty_str("P_prior", self.P_prior),
                pretty_str("x_post", self.x_post),
                pretty_str("P_post", self.P_post),
                pretty_str("F", self.F),
                pretty_str("Q", self.Q),
                pretty_str("R", self.R),
                pretty_str("H", self.H),
                pretty_str("K", self.K),
                pretty_str("y", self.y),
                pretty_str("S", self.S),
                pretty_str("SI", self.SI),
                pretty_str("M", self.M),
                pretty_str("B", self.B),
                pretty_str("z", self.z),
                pretty_str("log-likelihood", self.log_likelihood),
                pretty_str("likelihood", self.likelihood),
                pretty_str("mahalanobis", self.mahalanobis),
                pretty_str("alpha", self.alpha),
                pretty_str("inv", self.inv),
            ]
        )

    def test_matrix_dimensions(self, z=None, H=None, R=None, F=None, Q=None):
        """
        Performs a series of asserts to check that the size of everything
        is what it should be. This can help you debug problems in your design.

        If you pass in H, R, F, Q those will be used instead of this object's
        value for those matrices.

        Testing `z` (the measurement) is problamatic. x is a vector, and can be
        implemented as either a 1D array or as a nx1 column vector. Thus Hx
        can be of different shapes. Then, if Hx is a single value, it can
        be either a 1D array or 2D vector. If either is true, z can reasonably
        be a scalar (either '3' or np.array('3') are scalars under this
        definition), a 1D, 1 element array, or a 2D, 1 element array. You are
        allowed to pass in any combination that works.
        """

        if H is None:
            H = self.H
        if R is None:
            R = self.R
        if F is None:
            F = self.F
        if Q is None:
            Q = self.Q
        x = self.x
        P = self.P

        assert x.ndim == 1 or x.ndim == 2, "x must have one or two dimensions, but has {}".format(x.ndim)

        if x.ndim == 1:
            assert x.shape[0] == self.dim_x, "Shape of x must be ({},{}), but is {}".format(self.dim_x, 1, x.shape)
        else:
            assert x.shape == (self.dim_x, 1), "Shape of x must be ({},{}), but is {}".format(self.dim_x, 1, x.shape)

        assert P.shape == (self.dim_x, self.dim_x), "Shape of P must be ({},{}), but is {}".format(
            self.dim_x, self.dim_x, P.shape
        )

        assert Q.shape == (self.dim_x, self.dim_x), "Shape of Q must be ({},{}), but is {}".format(
            self.dim_x, self.dim_x, P.shape
        )

        assert F.shape == (self.dim_x, self.dim_x), "Shape of F must be ({},{}), but is {}".format(
            self.dim_x, self.dim_x, F.shape
        )

        assert np.ndim(H) == 2, "Shape of H must be (dim_z, {}), but is {}".format(P.shape[0], shape(H))

        assert H.shape[1] == P.shape[0], "Shape of H must be (dim_z, {}), but is {}".format(P.shape[0], H.shape)

        # shape of R must be the same as HPH'
        hph_shape = (H.shape[0], H.shape[0])
        r_shape = shape(R)

        if H.shape[0] == 1:
            # r can be scalar, 1D, or 2D in this case
            assert r_shape in [(), (1,), (1, 1)], "R must be scalar or one element array, but is shaped {}".format(
                r_shape
            )
        else:
            assert r_shape == hph_shape, "shape of R should be {} but it is {}".format(hph_shape, r_shape)

        if z is not None:
            z_shape = shape(z)
        else:
            z_shape = (self.dim_z, 1)

        # H@x must have shape of z
        Hx = dot(H, x)

        if z_shape == ():  # scalar or np.array(scalar)
            assert Hx.ndim == 1 or shape(Hx) == (1, 1), "shape of z should be {}, not {} for the given H".format(
                shape(Hx), z_shape
            )

        elif shape(Hx) == (1,):
            assert z_shape[0] == 1, "Shape of z must be {} for the given H".format(shape(Hx))

        else:
            assert z_shape == shape(Hx) or (len(z_shape) == 1 and shape(Hx) == (z_shape[0], 1)), (
                "shape of z should be {}, not {} for the given H".format(shape(Hx), z_shape)
            )

        if np.ndim(Hx) > 1 and shape(Hx) != (1, 1):
            assert shape(Hx) == z_shape, "shape of z should be {} for the given H, but it is {}".format(
                shape(Hx), z_shape
            )

alpha property writable

Fading memory setting. 1.0 gives the normal Kalman filter, and values slightly larger than 1.0 (such as 1.02) give a fading memory effect - previous measurements have less influence on the filter's estimates. This formulation of the Fading memory filter (there are many) is due to Dan Simon [1]_.

likelihood property

Computed from the log-likelihood. The log-likelihood can be very small, meaning a large negative value such as -28000. Taking the exp() of that results in 0.0, which can break typical algorithms which multiply by this value, so by default we always return a number >= sys.float_info.min.

log_likelihood property

log-likelihood of the last measurement.

mahalanobis property

" Mahalanobis distance of measurement. E.g. 3 means measurement was 3 standard deviations away from the predicted value.

Returns:

Name Type Description
mahalanobis float

batch_filter(zs, Fs=None, Qs=None, Hs=None, Rs=None, Bs=None, us=None, update_first=False, saver=None)

Batch processes a sequences of measurements.

Parameters

zs : list-like list of measurements at each time step self.dt. Missing measurements must be represented by None.

Fs : None, list-like, default=None optional value or list of values to use for the state transition matrix F.

 If Fs is None then self.F is used for all epochs.

 Otherwise it must contain a list-like list of F's, one for
 each epoch.  This allows you to have varying F per epoch.

Qs : None, np.array or list-like, default=None optional value or list of values to use for the process error covariance Q.

 If Qs is None then self.Q is used for all epochs.

 Otherwise it must contain a list-like list of Q's, one for
 each epoch.  This allows you to have varying Q per epoch.

Hs : None, np.array or list-like, default=None optional list of values to use for the measurement matrix H.

 If Hs is None then self.H is used for all epochs.

 If Hs contains a single matrix, then it is used as H for all
 epochs.

 Otherwise it must contain a list-like list of H's, one for
 each epoch.  This allows you to have varying H per epoch.

Rs : None, np.array or list-like, default=None optional list of values to use for the measurement error covariance R.

 If Rs is None then self.R is used for all epochs.

 Otherwise it must contain a list-like list of R's, one for
 each epoch.  This allows you to have varying R per epoch.

Bs : None, np.array or list-like, default=None optional list of values to use for the control transition matrix B.

 If Bs is None then self.B is used for all epochs.

 Otherwise it must contain a list-like list of B's, one for
 each epoch.  This allows you to have varying B per epoch.

us : None, np.array or list-like, default=None optional list of values to use for the control input vector;

 If us is None then None is used for all epochs (equivalent to 0,
 or no control input).

 Otherwise it must contain a list-like list of u's, one for
 each epoch.

update_first : bool, optional, default=False controls whether the order of operations is update followed by predict, or predict followed by update. Default is predict->update.

saver : bayesian_filters.common.Saver, optional bayesian_filters.common.Saver object. If provided, saver.save() will be called after every epoch

Returns

means : np.array((n,dim_x,1)) array of the state for each time step after the update. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance : np.array((n,dim_x,dim_x)) array of the covariances for each time step after the update. In other words covariance[k,:,:] is the covariance at step k.

means_predictions : np.array((n,dim_x,1)) array of the state for each time step after the predictions. Each entry is an np.array. In other words means[k,:] is the state at step k.

covariance_predictions : np.array((n,dim_x,dim_x)) array of the covariances for each time step after the prediction. In other words covariance[k,:,:] is the covariance at step k.

Examples

.. code-block:: Python

 # this example demonstrates tracking a measurement where the time
 # between measurement varies, as stored in dts. This requires
 # that F be recomputed for each epoch. The output is then smoothed
 # with an RTS smoother.

 zs = [t + random.randn()*4 for t in range (40)]
 Fs = [np.array([[1., dt], [0, 1]] for dt in dts]

 (mu, cov, _, _) = kf.batch_filter(zs, Fs=Fs)
 (xs, Ps, Ks, Pps) = kf.rts_smoother(mu, cov, Fs=Fs)
Source code in bayesian_filters/kalman/kalman_filter.py
def batch_filter(
    self,
    zs,
    Fs=None,
    Qs=None,
    Hs=None,
    Rs=None,
    Bs=None,
    us=None,
    update_first=False,
    saver=None,
):
    """Batch processes a sequences of measurements.

     Parameters
     ----------

     zs : list-like
         list of measurements at each time step `self.dt`. Missing
         measurements must be represented by `None`.

     Fs : None, list-like, default=None
         optional value or list of values to use for the state transition
         matrix F.

         If Fs is None then self.F is used for all epochs.

         Otherwise it must contain a list-like list of F's, one for
         each epoch.  This allows you to have varying F per epoch.

     Qs : None, np.array or list-like, default=None
         optional value or list of values to use for the process error
         covariance Q.

         If Qs is None then self.Q is used for all epochs.

         Otherwise it must contain a list-like list of Q's, one for
         each epoch.  This allows you to have varying Q per epoch.

     Hs : None, np.array or list-like, default=None
         optional list of values to use for the measurement matrix H.

         If Hs is None then self.H is used for all epochs.

         If Hs contains a single matrix, then it is used as H for all
         epochs.

         Otherwise it must contain a list-like list of H's, one for
         each epoch.  This allows you to have varying H per epoch.

     Rs : None, np.array or list-like, default=None
         optional list of values to use for the measurement error
         covariance R.

         If Rs is None then self.R is used for all epochs.

         Otherwise it must contain a list-like list of R's, one for
         each epoch.  This allows you to have varying R per epoch.

     Bs : None, np.array or list-like, default=None
         optional list of values to use for the control transition matrix B.

         If Bs is None then self.B is used for all epochs.

         Otherwise it must contain a list-like list of B's, one for
         each epoch.  This allows you to have varying B per epoch.

     us : None, np.array or list-like, default=None
         optional list of values to use for the control input vector;

         If us is None then None is used for all epochs (equivalent to 0,
         or no control input).

         Otherwise it must contain a list-like list of u's, one for
         each epoch.

    update_first : bool, optional, default=False
         controls whether the order of operations is update followed by
         predict, or predict followed by update. Default is predict->update.

     saver : bayesian_filters.common.Saver, optional
         bayesian_filters.common.Saver object. If provided, saver.save() will be
         called after every epoch

     Returns
     -------

     means : np.array((n,dim_x,1))
         array of the state for each time step after the update. Each entry
         is an np.array. In other words `means[k,:]` is the state at step
         `k`.

     covariance : np.array((n,dim_x,dim_x))
         array of the covariances for each time step after the update.
         In other words `covariance[k,:,:]` is the covariance at step `k`.

     means_predictions : np.array((n,dim_x,1))
         array of the state for each time step after the predictions. Each
         entry is an np.array. In other words `means[k,:]` is the state at
         step `k`.

     covariance_predictions : np.array((n,dim_x,dim_x))
         array of the covariances for each time step after the prediction.
         In other words `covariance[k,:,:]` is the covariance at step `k`.

     Examples
     --------

     .. code-block:: Python

         # this example demonstrates tracking a measurement where the time
         # between measurement varies, as stored in dts. This requires
         # that F be recomputed for each epoch. The output is then smoothed
         # with an RTS smoother.

         zs = [t + random.randn()*4 for t in range (40)]
         Fs = [np.array([[1., dt], [0, 1]] for dt in dts]

         (mu, cov, _, _) = kf.batch_filter(zs, Fs=Fs)
         (xs, Ps, Ks, Pps) = kf.rts_smoother(mu, cov, Fs=Fs)
    """

    # pylint: disable=too-many-statements
    n = np.size(zs, 0)
    if Fs is None:
        Fs = [self.F] * n
    if Qs is None:
        Qs = [self.Q] * n
    if Hs is None:
        Hs = [self.H] * n
    if Rs is None:
        Rs = [self.R] * n
    if Bs is None:
        Bs = [self.B] * n
    if us is None:
        us = [0] * n

    # mean estimates from Kalman Filter
    if self.x.ndim == 1:
        means = zeros((n, self.dim_x))
        means_p = zeros((n, self.dim_x))
    else:
        means = zeros((n, self.dim_x, 1))
        means_p = zeros((n, self.dim_x, 1))

    # state covariances from Kalman Filter
    covariances = zeros((n, self.dim_x, self.dim_x))
    covariances_p = zeros((n, self.dim_x, self.dim_x))

    if update_first:
        for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):
            self.update(z, R=R, H=H)
            means[i, :] = self.x
            covariances[i, :, :] = self.P

            self.predict(u=u, B=B, F=F, Q=Q)
            means_p[i, :] = self.x
            covariances_p[i, :, :] = self.P

            if saver is not None:
                saver.save()
    else:
        for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)):
            self.predict(u=u, B=B, F=F, Q=Q)
            means_p[i, :] = self.x
            covariances_p[i, :, :] = self.P

            self.update(z, R=R, H=H)
            means[i, :] = self.x
            covariances[i, :, :] = self.P

            if saver is not None:
                saver.save()

    return (means, covariances, means_p, covariances_p)

get_prediction(u=None, B=None, F=None, Q=None)

Predict next state (prior) using the Kalman filter state propagation equations and returns it without modifying the object.

Parameters:

Name Type Description Default
u array

Optional control vector.

0
B np.array(dim_x, dim_u), or None

Optional control transition matrix; a value of None will cause the filter to use self.B.

None
F np.array(dim_x, dim_x), or None

Optional state transition matrix; a value of None will cause the filter to use self.F.

None
Q np.array(dim_x, dim_x), scalar, or None

Optional process noise matrix; a value of None will cause the filter to use self.Q.

None

Returns:

Type Description
(x, P) : tuple

State vector and covariance array of the prediction.

Source code in bayesian_filters/kalman/kalman_filter.py
def get_prediction(self, u=None, B=None, F=None, Q=None):
    """
    Predict next state (prior) using the Kalman filter state propagation
    equations and returns it without modifying the object.

    Parameters
    ----------

    u : np.array, default 0
        Optional control vector.

    B : np.array(dim_x, dim_u), or None
        Optional control transition matrix; a value of None
        will cause the filter to use `self.B`.

    F : np.array(dim_x, dim_x), or None
        Optional state transition matrix; a value of None
        will cause the filter to use `self.F`.

    Q : np.array(dim_x, dim_x), scalar, or None
        Optional process noise matrix; a value of None will cause the
        filter to use `self.Q`.

    Returns
    -------

    (x, P) : tuple
        State vector and covariance array of the prediction.
    """

    if B is None:
        B = self.B
    if F is None:
        F = self.F
    if Q is None:
        Q = self.Q
    elif isscalar(Q):
        Q = eye(self.dim_x) * Q

    # x = Fx + Bu
    if B is not None and u is not None:
        x = dot(F, self.x) + dot(B, u)
    else:
        x = dot(F, self.x)

    # P = FPF' + Q
    P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q

    return x, P

get_update(z=None)

Computes the new estimate based on measurement z and returns it without altering the state of the filter.

Parameters:

Name Type Description Default
z (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

None

Returns:

Type Description
(x, P) : tuple

State vector and covariance array of the update.

Source code in bayesian_filters/kalman/kalman_filter.py
def get_update(self, z=None):
    """
    Computes the new estimate based on measurement `z` and returns it
    without altering the state of the filter.

    Parameters
    ----------

    z : (dim_z, 1): array_like
        measurement for this update. z can be a scalar if dim_z is 1,
        otherwise it must be convertible to a column vector.

    Returns
    -------

    (x, P) : tuple
        State vector and covariance array of the update.
    """

    if z is None:
        return self.x, self.P
    z = reshape_z(z, self.dim_z, self.x.ndim)

    R = self.R
    H = self.H
    P = self.P
    x = self.x

    # error (residual) between measurement and prediction
    y = z - dot(H, x)

    # common subexpression for speed
    PHT = dot(P, H.T)

    # project system uncertainty into measurement space
    S = dot(H, PHT) + R

    # map system uncertainty into kalman gain
    K = dot(PHT, self.inv(S))

    # predict new x with residual scaled by the kalman gain
    x = x + dot(K, y)

    # P = (I-KH)P(I-KH)' + KRK'
    I_KH = self._I - dot(K, H)
    P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T)

    return x, P

log_likelihood_of(z)

log likelihood of the measurement z. This should only be called after a call to update(). Calling after predict() will yield an incorrect result.

Source code in bayesian_filters/kalman/kalman_filter.py
def log_likelihood_of(self, z):
    """
    log likelihood of the measurement `z`. This should only be called
    after a call to update(). Calling after predict() will yield an
    incorrect result."""

    if z is None:
        return log(sys.float_info.min)
    return logpdf(z, dot(self.H, self.x), self.S)

measurement_of_state(x)

Helper function that converts a state into a measurement.

Parameters:

Name Type Description Default
x array

kalman state vector

required

Returns:

Name Type Description
z (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

Source code in bayesian_filters/kalman/kalman_filter.py
def measurement_of_state(self, x):
    """
    Helper function that converts a state into a measurement.

    Parameters
    ----------

    x : np.array
        kalman state vector

    Returns
    -------

    z : (dim_z, 1): array_like
        measurement for this update. z can be a scalar if dim_z is 1,
        otherwise it must be convertible to a column vector.
    """

    return dot(self.H, x)

predict(u=None, B=None, F=None, Q=None)

Predict next state (prior) using the Kalman filter state propagation equations.

Parameters:

Name Type Description Default
u array

Optional control vector.

0
B np.array(dim_x, dim_u), or None

Optional control transition matrix; a value of None will cause the filter to use self.B.

None
F np.array(dim_x, dim_x), or None

Optional state transition matrix; a value of None will cause the filter to use self.F.

None
Q np.array(dim_x, dim_x), scalar, or None

Optional process noise matrix; a value of None will cause the filter to use self.Q.

None
Source code in bayesian_filters/kalman/kalman_filter.py
def predict(self, u=None, B=None, F=None, Q=None):
    """
    Predict next state (prior) using the Kalman filter state propagation
    equations.

    Parameters
    ----------

    u : np.array, default 0
        Optional control vector.

    B : np.array(dim_x, dim_u), or None
        Optional control transition matrix; a value of None
        will cause the filter to use `self.B`.

    F : np.array(dim_x, dim_x), or None
        Optional state transition matrix; a value of None
        will cause the filter to use `self.F`.

    Q : np.array(dim_x, dim_x), scalar, or None
        Optional process noise matrix; a value of None will cause the
        filter to use `self.Q`.
    """

    if B is None:
        B = self.B
    if F is None:
        F = self.F
    if Q is None:
        Q = self.Q
    elif isscalar(Q):
        Q = eye(self.dim_x) * Q

    # x = Fx + Bu
    if B is not None and u is not None:
        self.x = dot(F, self.x) + dot(B, u)
    else:
        self.x = dot(F, self.x)

    # P = FPF' + Q
    self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q

    # save prior
    self.x_prior = self.x.copy()
    self.P_prior = self.P.copy()

predict_steadystate(u=0, B=None)

Predict state (prior) using the Kalman filter state propagation equations. Only x is updated, P is left unchanged. See update_steadstate() for a longer explanation of when to use this method.

Parameters:

Name Type Description Default
u array

Optional control vector. If non-zero, it is multiplied by B to create the control input into the system.

0
B np.array(dim_x, dim_u), or None

Optional control transition matrix; a value of None will cause the filter to use self.B.

None
Source code in bayesian_filters/kalman/kalman_filter.py
def predict_steadystate(self, u=0, B=None):
    """
    Predict state (prior) using the Kalman filter state propagation
    equations. Only x is updated, P is left unchanged. See
    update_steadstate() for a longer explanation of when to use this
    method.

    Parameters
    ----------

    u : np.array
        Optional control vector. If non-zero, it is multiplied by B
        to create the control input into the system.

    B : np.array(dim_x, dim_u), or None
        Optional control transition matrix; a value of None
        will cause the filter to use `self.B`.
    """

    if B is None:
        B = self.B

    # x = Fx + Bu
    if B is not None:
        self.x = dot(self.F, self.x) + dot(B, u)
    else:
        self.x = dot(self.F, self.x)

    # save prior
    self.x_prior = self.x.copy()
    self.P_prior = self.P.copy()

residual_of(z)

Returns the residual for the given measurement (z). Does not alter the state of the filter.

Source code in bayesian_filters/kalman/kalman_filter.py
def residual_of(self, z):
    """
    Returns the residual for the given measurement (z). Does not alter
    the state of the filter.
    """
    z = reshape_z(z, self.dim_z, self.x.ndim)
    return z - dot(self.H, self.x_prior)

rts_smoother(Xs, Ps, Fs=None, Qs=None, inv=np.linalg.inv)

Runs the Rauch-Tung-Striebel Kalman smoother on a set of means and covariances computed by a Kalman filter. The usual input would come from the output of KalmanFilter.batch_filter().

Parameters:

Name Type Description Default
Xs array

array of the means (state variable x) of the output of a Kalman filter.

required
Ps array

array of the covariances of the output of a kalman filter.

required
Fs list-like collection of numpy.array

State transition matrix of the Kalman filter at each time step. Optional, if not provided the filter's self.F will be used

None
Qs list-like collection of numpy.array

Process noise of the Kalman filter at each time step. Optional, if not provided the filter's self.Q will be used

None
inv function

If you prefer another inverse function, such as the Moore-Penrose pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv

numpy.linalg.inv

Returns:

Name Type Description
x ndarray

smoothed means

P ndarray

smoothed state covariances

K ndarray

smoother gain at each step

Pp ndarray

Predicted state covariances

Examples:

.. code-block:: Python

zs = [t + random.randn()*4 for t in range (40)]

(mu, cov, _, _) = kalman.batch_filter(zs)
(x, P, K, Pp) = rts_smoother(mu, cov, kf.F, kf.Q)
Source code in bayesian_filters/kalman/kalman_filter.py
def rts_smoother(self, Xs, Ps, Fs=None, Qs=None, inv=np.linalg.inv):
    """
    Runs the Rauch-Tung-Striebel Kalman smoother on a set of
    means and covariances computed by a Kalman filter. The usual input
    would come from the output of `KalmanFilter.batch_filter()`.

    Parameters
    ----------

    Xs : numpy.array
        array of the means (state variable x) of the output of a Kalman
        filter.

    Ps : numpy.array
        array of the covariances of the output of a kalman filter.

    Fs : list-like collection of numpy.array, optional
        State transition matrix of the Kalman filter at each time step.
        Optional, if not provided the filter's self.F will be used

    Qs : list-like collection of numpy.array, optional
        Process noise of the Kalman filter at each time step. Optional,
        if not provided the filter's self.Q will be used

    inv : function, default numpy.linalg.inv
        If you prefer another inverse function, such as the Moore-Penrose
        pseudo inverse, set it to that instead: kf.inv = np.linalg.pinv


    Returns
    -------

    x : numpy.ndarray
        smoothed means

    P : numpy.ndarray
        smoothed state covariances

    K : numpy.ndarray
        smoother gain at each step

    Pp : numpy.ndarray
        Predicted state covariances

    Examples
    --------

    .. code-block:: Python

        zs = [t + random.randn()*4 for t in range (40)]

        (mu, cov, _, _) = kalman.batch_filter(zs)
        (x, P, K, Pp) = rts_smoother(mu, cov, kf.F, kf.Q)

    """

    if len(Xs) != len(Ps):
        raise ValueError("length of Xs and Ps must be the same")

    n = Xs.shape[0]
    dim_x = Xs.shape[1]

    if Fs is None:
        Fs = [self.F] * n
    if Qs is None:
        Qs = [self.Q] * n

    # smoother gain
    K = zeros((n, dim_x, dim_x))

    x, P, Pp = Xs.copy(), Ps.copy(), Ps.copy()
    for k in range(n - 2, -1, -1):
        Pp[k] = dot(dot(Fs[k + 1], P[k]), Fs[k + 1].T) + Qs[k + 1]

        # pylint: disable=bad-whitespace
        K[k] = dot(dot(P[k], Fs[k + 1].T), inv(Pp[k]))
        x[k] += dot(K[k], x[k + 1] - dot(Fs[k + 1], x[k]))
        P[k] += dot(dot(K[k], P[k + 1] - Pp[k]), K[k].T)

    return (x, P, K, Pp)

test_matrix_dimensions(z=None, H=None, R=None, F=None, Q=None)

Performs a series of asserts to check that the size of everything is what it should be. This can help you debug problems in your design.

If you pass in H, R, F, Q those will be used instead of this object's value for those matrices.

Testing z (the measurement) is problamatic. x is a vector, and can be implemented as either a 1D array or as a nx1 column vector. Thus Hx can be of different shapes. Then, if Hx is a single value, it can be either a 1D array or 2D vector. If either is true, z can reasonably be a scalar (either '3' or np.array('3') are scalars under this definition), a 1D, 1 element array, or a 2D, 1 element array. You are allowed to pass in any combination that works.

Source code in bayesian_filters/kalman/kalman_filter.py
def test_matrix_dimensions(self, z=None, H=None, R=None, F=None, Q=None):
    """
    Performs a series of asserts to check that the size of everything
    is what it should be. This can help you debug problems in your design.

    If you pass in H, R, F, Q those will be used instead of this object's
    value for those matrices.

    Testing `z` (the measurement) is problamatic. x is a vector, and can be
    implemented as either a 1D array or as a nx1 column vector. Thus Hx
    can be of different shapes. Then, if Hx is a single value, it can
    be either a 1D array or 2D vector. If either is true, z can reasonably
    be a scalar (either '3' or np.array('3') are scalars under this
    definition), a 1D, 1 element array, or a 2D, 1 element array. You are
    allowed to pass in any combination that works.
    """

    if H is None:
        H = self.H
    if R is None:
        R = self.R
    if F is None:
        F = self.F
    if Q is None:
        Q = self.Q
    x = self.x
    P = self.P

    assert x.ndim == 1 or x.ndim == 2, "x must have one or two dimensions, but has {}".format(x.ndim)

    if x.ndim == 1:
        assert x.shape[0] == self.dim_x, "Shape of x must be ({},{}), but is {}".format(self.dim_x, 1, x.shape)
    else:
        assert x.shape == (self.dim_x, 1), "Shape of x must be ({},{}), but is {}".format(self.dim_x, 1, x.shape)

    assert P.shape == (self.dim_x, self.dim_x), "Shape of P must be ({},{}), but is {}".format(
        self.dim_x, self.dim_x, P.shape
    )

    assert Q.shape == (self.dim_x, self.dim_x), "Shape of Q must be ({},{}), but is {}".format(
        self.dim_x, self.dim_x, P.shape
    )

    assert F.shape == (self.dim_x, self.dim_x), "Shape of F must be ({},{}), but is {}".format(
        self.dim_x, self.dim_x, F.shape
    )

    assert np.ndim(H) == 2, "Shape of H must be (dim_z, {}), but is {}".format(P.shape[0], shape(H))

    assert H.shape[1] == P.shape[0], "Shape of H must be (dim_z, {}), but is {}".format(P.shape[0], H.shape)

    # shape of R must be the same as HPH'
    hph_shape = (H.shape[0], H.shape[0])
    r_shape = shape(R)

    if H.shape[0] == 1:
        # r can be scalar, 1D, or 2D in this case
        assert r_shape in [(), (1,), (1, 1)], "R must be scalar or one element array, but is shaped {}".format(
            r_shape
        )
    else:
        assert r_shape == hph_shape, "shape of R should be {} but it is {}".format(hph_shape, r_shape)

    if z is not None:
        z_shape = shape(z)
    else:
        z_shape = (self.dim_z, 1)

    # H@x must have shape of z
    Hx = dot(H, x)

    if z_shape == ():  # scalar or np.array(scalar)
        assert Hx.ndim == 1 or shape(Hx) == (1, 1), "shape of z should be {}, not {} for the given H".format(
            shape(Hx), z_shape
        )

    elif shape(Hx) == (1,):
        assert z_shape[0] == 1, "Shape of z must be {} for the given H".format(shape(Hx))

    else:
        assert z_shape == shape(Hx) or (len(z_shape) == 1 and shape(Hx) == (z_shape[0], 1)), (
            "shape of z should be {}, not {} for the given H".format(shape(Hx), z_shape)
        )

    if np.ndim(Hx) > 1 and shape(Hx) != (1, 1):
        assert shape(Hx) == z_shape, "shape of z should be {} for the given H, but it is {}".format(
            shape(Hx), z_shape
        )

update(z, R=None, H=None)

Add a new measurement (z) to the Kalman filter.

If z is None, nothing is computed. However, x_post and P_post are updated with the prior (x_prior, P_prior), and self.z is set to None.

Parameters:

Name Type Description Default
z (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

If you pass in a value of H, z must be a column vector the of the correct size.

required
R np.array, scalar, or None

Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used.

None
H np.array, or None

Optionally provide H to override the measurement function for this one call, otherwise self.H will be used.

None
Source code in bayesian_filters/kalman/kalman_filter.py
def update(self, z, R=None, H=None):
    """
    Add a new measurement (z) to the Kalman filter.

    If z is None, nothing is computed. However, x_post and P_post are
    updated with the prior (x_prior, P_prior), and self.z is set to None.

    Parameters
    ----------
    z : (dim_z, 1): array_like
        measurement for this update. z can be a scalar if dim_z is 1,
        otherwise it must be convertible to a column vector.

        If you pass in a value of H, z must be a column vector the
        of the correct size.

    R : np.array, scalar, or None
        Optionally provide R to override the measurement noise for this
        one call, otherwise  self.R will be used.

    H : np.array, or None
        Optionally provide H to override the measurement function for this
        one call, otherwise self.H will be used.
    """

    # set to None to force recompute
    self._log_likelihood = None
    self._likelihood = None
    self._mahalanobis = None

    if z is None:
        self.z = np.array([[None] * self.dim_z]).T
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()
        self.y = zeros((self.dim_z, 1))
        return

    if R is None:
        R = self.R
    elif isscalar(R):
        R = eye(self.dim_z) * R

    if H is None:
        z = reshape_z(z, self.dim_z, self.x.ndim)
        H = self.H

    # y = z - Hx
    # error (residual) between measurement and prediction
    self.y = z - dot(H, self.x)

    # common subexpression for speed
    PHT = dot(self.P, H.T)

    # S = HPH' + R
    # project system uncertainty into measurement space
    self.S = dot(H, PHT) + R
    self.SI = self.inv(self.S)
    # K = PH'inv(S)
    # map system uncertainty into kalman gain
    self.K = dot(PHT, self.SI)

    # x = x + Ky
    # predict new x with residual scaled by the kalman gain
    self.x = self.x + dot(self.K, self.y)

    # P = (I-KH)P(I-KH)' + KRK'
    # This is more numerically stable
    # and works for non-optimal K vs the equation
    # P = (I-KH)P usually seen in the literature.

    I_KH = self._I - dot(self.K, H)
    self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T)

    # save measurement and posterior state
    self.z = deepcopy(z)
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()

update_correlated(z, R=None, H=None)

Add a new measurement (z) to the Kalman filter assuming that process noise and measurement noise are correlated as defined in the self.M matrix.

A partial derivation can be found in [1]

If z is None, nothing is changed.

Parameters:

Name Type Description Default
z (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

required
R np.array, scalar, or None

Optionally provide R to override the measurement noise for this one call, otherwise self.R will be used.

None
H np.array, or None

Optionally provide H to override the measurement function for this one call, otherwise self.H will be used.

None
References

.. [1] Bulut, Y. (2011). Applied Kalman filter theory (Doctoral dissertation, Northeastern University). http://people.duke.edu/~hpgavin/SystemID/References/Balut-KalmanFilter-PhD-NEU-2011.pdf

Source code in bayesian_filters/kalman/kalman_filter.py
def update_correlated(self, z, R=None, H=None):
    """Add a new measurement (z) to the Kalman filter assuming that
    process noise and measurement noise are correlated as defined in
    the `self.M` matrix.

    A partial derivation can be found in [1]

    If z is None, nothing is changed.

    Parameters
    ----------
    z : (dim_z, 1): array_like
        measurement for this update. z can be a scalar if dim_z is 1,
        otherwise it must be convertible to a column vector.

    R : np.array, scalar, or None
        Optionally provide R to override the measurement noise for this
        one call, otherwise  self.R will be used.

    H : np.array,  or None
        Optionally provide H to override the measurement function for this
        one call, otherwise  self.H will be used.

    References
    ----------

    .. [1] Bulut, Y. (2011). Applied Kalman filter theory (Doctoral dissertation, Northeastern University).
           http://people.duke.edu/~hpgavin/SystemID/References/Balut-KalmanFilter-PhD-NEU-2011.pdf
    """

    # set to None to force recompute
    self._log_likelihood = None
    self._likelihood = None
    self._mahalanobis = None

    if z is None:
        self.z = np.array([[None] * self.dim_z]).T
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()
        self.y = zeros((self.dim_z, 1))
        return

    if R is None:
        R = self.R
    elif isscalar(R):
        R = eye(self.dim_z) * R

    # rename for readability and a tiny extra bit of speed
    if H is None:
        z = reshape_z(z, self.dim_z, self.x.ndim)
        H = self.H

    # handle special case: if z is in form [[z]] but x is not a column
    # vector dimensions will not match
    if self.x.ndim == 1 and shape(z) == (1, 1):
        z = z[0]

    if shape(z) == ():  # is it scalar, e.g. z=3 or z=np.array(3)
        z = np.asarray([z])

    # y = z - Hx
    # error (residual) between measurement and prediction
    self.y = z - dot(H, self.x)

    # common subexpression for speed
    PHT = dot(self.P, H.T)

    # project system uncertainty into measurement space
    self.S = dot(H, PHT) + dot(H, self.M) + dot(self.M.T, H.T) + R
    self.SI = self.inv(self.S)

    # K = PH'inv(S)
    # map system uncertainty into kalman gain
    self.K = dot(PHT + self.M, self.SI)

    # x = x + Ky
    # predict new x with residual scaled by the kalman gain
    self.x = self.x + dot(self.K, self.y)
    self.P = self.P - dot(self.K, dot(H, self.P) + self.M.T)

    self.z = deepcopy(z)
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()

update_sequential(start, z_i, R_i=None, H_i=None)

Add a single input measurement (z_i) to the Kalman filter. In sequential processing, inputs are processed one at a time.

Parameters:

Name Type Description Default
start integer

Index of the first measurement input updated by this call.

required
z_i array or scalar

Measurement of inputs for this partial update.

required
R_i np.array, scalar, or None

Optionally provide R_i to override the measurement noise of inputs for this one call, otherwise a slice of self.R will be used.

None
H_i np.array, or None

Optionally provide H[i] to override the partial measurement function for this one call, otherwise a slice of self.H will be used.

None
Source code in bayesian_filters/kalman/kalman_filter.py
def update_sequential(self, start, z_i, R_i=None, H_i=None):
    """
    Add a single input measurement (z_i) to the Kalman filter.
    In sequential processing, inputs are processed one at a time.

    Parameters
    ----------
    start : integer
        Index of the first measurement input updated by this call.

    z_i : np.array or scalar
        Measurement of inputs for this partial update.

    R_i : np.array, scalar, or None
        Optionally provide R_i to override the measurement noise of
        inputs for this one call, otherwise a slice of self.R will
        be used.

    H_i : np.array, or None
        Optionally provide H[i] to override the partial measurement
        function for this one call, otherwise a slice of self.H will
        be used.
    """

    if isscalar(z_i):
        length = 1
    else:
        length = len(z_i)
    z_i = np.reshape(z_i, [length, 1])
    stop = start + length

    if R_i is None:
        R_i = self.R[start:stop, start:stop]
    elif isscalar(R_i):
        R_i = eye(length) * R_i

    if H_i is None:
        H_i = self.H[start:stop]

    H_i = np.reshape(H_i, [length, self.dim_x])

    # y_i = z_i - H_i @ x
    # error (residual) between measurement and prediction
    y_i = z_i - dot(H_i, self.x)
    self.y[start:stop] = y_i

    # common subexpression for speed
    PHT = dot(self.P, H_i.T)

    # project system uncertainty into the measurement subspace
    S_i = dot(H_i, PHT) + R_i

    if length == 1:
        K_i = PHT * (1.0 / S_i)
    else:
        K_i = dot(PHT, linalg.inv(S_i))

    self.K[:, start:stop] = K_i
    I_KH = self._I - np.dot(K_i, H_i)

    # x = x + K_i @ y_i
    # update state estimation with residual scaled by the kalman gain
    self.x += dot(K_i, y_i)

    # compute the posterior covariance
    self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(K_i, R_i), K_i.T)

    # save measurement component #i and the posterior state
    self.z[start:stop] = z_i
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()

update_steadystate(z)

Add a new measurement (z) to the Kalman filter without recomputing the Kalman gain K, the state covariance P, or the system uncertainty S.

You can use this for LTI systems since the Kalman gain and covariance converge to a fixed value. Precompute these and assign them explicitly, or run the Kalman filter using the normal predict()/update(0 cycle until they converge.

The main advantage of this call is speed. We do significantly less computation, notably avoiding a costly matrix inversion.

Use in conjunction with predict_steadystate(), otherwise P will grow without bound.

Parameters:

Name Type Description Default
z (dim_z, 1): array_like

measurement for this update. z can be a scalar if dim_z is 1, otherwise it must be convertible to a column vector.

required

Examples:

>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> # let filter converge on representative data, then save k and P
>>> for i in range(100):
>>>     cv.predict()
>>>     cv.update([i, i, i])
>>> saved_k = np.copy(cv.K)
>>> saved_P = np.copy(cv.P)

later on:

>>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
>>> cv.K = np.copy(saved_K)
>>> cv.P = np.copy(saved_P)
>>> for i in range(100):
>>>     cv.predict_steadystate()
>>>     cv.update_steadystate([i, i, i])
Source code in bayesian_filters/kalman/kalman_filter.py
def update_steadystate(self, z):
    """
    Add a new measurement (z) to the Kalman filter without recomputing
    the Kalman gain K, the state covariance P, or the system
    uncertainty S.

    You can use this for LTI systems since the Kalman gain and covariance
    converge to a fixed value. Precompute these and assign them explicitly,
    or run the Kalman filter using the normal predict()/update(0 cycle
    until they converge.

    The main advantage of this call is speed. We do significantly less
    computation, notably avoiding a costly matrix inversion.

    Use in conjunction with predict_steadystate(), otherwise P will grow
    without bound.

    Parameters
    ----------
    z : (dim_z, 1): array_like
        measurement for this update. z can be a scalar if dim_z is 1,
        otherwise it must be convertible to a column vector.


    Examples
    --------
    >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
    >>> # let filter converge on representative data, then save k and P
    >>> for i in range(100):
    >>>     cv.predict()
    >>>     cv.update([i, i, i])
    >>> saved_k = np.copy(cv.K)
    >>> saved_P = np.copy(cv.P)

    later on:

    >>> cv = kinematic_kf(dim=3, order=2) # 3D const velocity filter
    >>> cv.K = np.copy(saved_K)
    >>> cv.P = np.copy(saved_P)
    >>> for i in range(100):
    >>>     cv.predict_steadystate()
    >>>     cv.update_steadystate([i, i, i])
    """

    # set to None to force recompute
    self._log_likelihood = None
    self._likelihood = None
    self._mahalanobis = None

    if z is None:
        self.z = np.array([[None] * self.dim_z]).T
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()
        self.y = zeros((self.dim_z, 1))
        return

    z = reshape_z(z, self.dim_z, self.x.ndim)

    # y = z - Hx
    # error (residual) between measurement and prediction
    self.y = z - dot(self.H, self.x)

    # x = x + Ky
    # predict new x with residual scaled by the kalman gain
    self.x = self.x + dot(self.K, self.y)

    self.z = deepcopy(z)
    self.x_post = self.x.copy()
    self.P_post = self.P.copy()

    # set to None to force recompute
    self._log_likelihood = None
    self._likelihood = None
    self._mahalanobis = None