Grid Search optimal Algorithm Parameter#

Note

These examples are basically copies from the same examples in tpcp, but using gait algorithms! These examples are less often updated than the official tpcp examples. Hence, it makes sense to cross-check the official examples.

import pandas as pd

To perform a GridSearch (or any other form of parameter optimization in Gaitmap), we first need to have a Dataset, a Pipeline and a score function.

1. The Dataset#

Datsets wrap multiple gait recordings into an easy to use interface that can be passed around between the higher level gaitmap functions. Learn more about this here. If you are lucky, you do not need to create the dataset on your own, but someone has already created a gaitmap dataset for the data you want to use.

Here we are going to create a simple “dummy” dataset, that just uses the left and the right-foot recording of the example data to simulate a dataset of one participant with two recordings. In addition to the raw IMU data, the dataset also exposes reference stride borders that we will use to judge the performance of our algorithm. (Note, usually, you wouldn’t split the left and the right foot into separate recordings, as gaitmap can handle multiple sensor recordings at once).

For our GridSearch, we need an instance of this dataset.

from tpcp import Dataset

from gaitmap.example_data import get_healthy_example_imu_data, get_healthy_example_stride_borders
from gaitmap.utils.datatype_helper import SingleSensorStrideList


class MyDataset(Dataset):
    @property
    def sampling_rate_hz(self) -> float:
        return 204.8

    @property
    def data(self):
        self.assert_is_single(None, "data")
        return get_healthy_example_imu_data()[self.index.iloc[0]["foot"] + "_sensor"]

    @property
    def segmented_stride_list_(self) -> pd.DataFrame:
        self.assert_is_single(None, "data")
        return get_healthy_example_stride_borders()[self.index.iloc[0]["foot"] + "_sensor"].set_index("s_id")

    def create_index(self) -> pd.DataFrame:
        return pd.DataFrame({"participant": ["test", "test"], "foot": ["left", "right"]})


dataset = MyDataset()
dataset

MyDataset [2 groups/rows]

participant foot
0 test left
1 test right


2. The Pipeline#

The pipeline simply defines what algorithms we want to run on our data and defines, which parameters of the pipeline you still want to be able to modify (e.g. to optimize in the GridSearch).

The pipeline usually needs 3 things:

  1. It needs to be subclass of SimplePipeline.

  2. It needs to have a run method that runs all the algorithmic steps and stores the results as class attributes. The run method should expect only a single data point (in our case a single recording of one sensor) as input.

  3. A init that defines all parameters that should be adjustable. Note, that the names in the function signature of the init method, must match the corresponding attribute names (e.g. max_cost -> self.max_cost).

Here we simply transform the data into the correct coordinate system depending on the foot and apply BarthDtw to identify the start and the end of all strides in the recording. The parameter max_cost of this algorithm is exposed and will be optimized as part of the GridSearch.

For the final GridSearch, we need an instance of the pipeline object.

from tpcp import Pipeline

from gaitmap.stride_segmentation import BarthDtw
from gaitmap.utils.coordinate_conversion import convert_left_foot_to_fbf, convert_right_foot_to_fbf


class MyPipeline(Pipeline):
    max_cost: float

    segmented_stride_list_: SingleSensorStrideList

    def __init__(self, max_cost: float = 3) -> None:
        self.max_cost = max_cost

    def run(self, datapoint: MyDataset):
        converter = {"left": convert_left_foot_to_fbf, "right": convert_right_foot_to_fbf}
        # `datapoint.groups[0]` gives us the identifier of the datapoint (e.g. `("test", "left")`).
        # And ``datapoint.groups[0][1]` is the foot.
        data = converter[datapoint.groups[0][1]](datapoint.data)

        dtw = BarthDtw(max_cost=self.max_cost)
        dtw.segment(data, datapoint.sampling_rate_hz)

        self.segmented_stride_list_ = dtw.stride_list_
        return self


pipe = MyPipeline()

3. The scorer#

In the context of a gridsearch, we want to calculate the performance of our algorithm and rank the different parameter candidates accordingly. This is what our score function is for. It gets a pipeline object (without results!) and a data point (i.e. a single recording) as input and should return a some sort of performance metric. A higher value is always considered better. If you want to calculate multiple performance measures, you can also return a dictionary of such values. In any case, the performance for a specific parameter combination in the GridSearch will be calculated as the mean over all datapoints. (Note, if you want to change this, you can create custom subclasses of GaitmapScorer).

A typical score function will first call safe_run (which calls run internally) on the pipeline and then compare the output with some reference. This reference should be supplied as part of the dataset.

Instead of using a function as scorer (shown here), you can also implement a method called score on your pipeline. Then just pass None (which is the default) for the scoring parameter in the GridSearch (and other optimizers). However, a function is usually more flexible.

In this case we compare the calculated stride lists with the reference and identify which strides were correctly found. Based on these matches, we calculate the precision, the recall, and the f1-score using some helper functions.

from gaitmap.evaluation_utils import evaluate_segmented_stride_list, precision_recall_f1_score


def score(pipeline: MyPipeline, datapoint: MyDataset):
    # We use the `safe_run` wrapper instead of just run. This is always a good idea.
    pipeline.safe_run(datapoint)
    matches_df = evaluate_segmented_stride_list(
        ground_truth=datapoint.segmented_stride_list_, segmented_stride_list=pipeline.segmented_stride_list_
    )
    assert isinstance(matches_df, pd.DataFrame)
    return precision_recall_f1_score(matches_df)

The Parameters#

The last step before running the GridSearch, is to select the parameters we want to test for each dataset. For this, we can directly use sklearn’s ParameterGrid.

In this example, we will just test two values for the max_cost threshold.

from sklearn.model_selection import ParameterGrid

parameters = ParameterGrid({"max_cost": [3, 5]})

Running the GridSearch#

Now we have all the pieces to run the GridSearch. After initializing, we can use optimize to run the GridSearch.

Note

If the score function returns a dictionary of scores, rank_scorer must be set to the name of the score, that should be used to decide on the best parameter set.

from tpcp.optimize import GridSearch

gs = GridSearch(pipe, parameters, scoring=score, return_optimized="f1_score")
gs = gs.optimize(MyDataset())
Parameter Combinations:   0%|          | 0/2 [00:00<?, ?it/s]

Datapoints:   0%|          | 0/2 [00:00<?, ?it/s]

Datapoints:  50%|#####     | 1/2 [00:00<00:00,  6.55it/s]

Datapoints: 100%|##########| 2/2 [00:00<00:00,  7.29it/s]
Datapoints: 100%|##########| 2/2 [00:00<00:00,  7.15it/s]

Parameter Combinations:  50%|#####     | 1/2 [00:00<00:00,  3.24it/s]

Datapoints:   0%|          | 0/2 [00:00<?, ?it/s]

Datapoints:  50%|#####     | 1/2 [00:00<00:00,  8.36it/s]

Datapoints: 100%|##########| 2/2 [00:00<00:00,  7.66it/s]
Datapoints: 100%|##########| 2/2 [00:00<00:00,  7.74it/s]

Parameter Combinations: 100%|##########| 2/2 [00:00<00:00,  3.36it/s]
Parameter Combinations: 100%|##########| 2/2 [00:00<00:00,  3.34it/s]

The main results are stored in gs_results_. It shows the mean performance per parameter combination, the rank for each parameter combination and the performance for each individual data point (in our case a single recording of one sensor).

precision rank_precision recall rank_recall f1_score rank_f1_score single_precision single_recall single_f1_score data_labels score_time param_max_cost params
0 1.000000 1 0.254762 2 0.393293 2 [1.0, 1.0] [0.14285714285714285, 0.36666666666666664] [0.25, 0.5365853658536585] [(test, left), (test, right)] 0.305605 3 {'max_cost': 3}
1 0.982759 2 0.966667 1 0.974576 1 [1.0, 0.9655172413793104] [1.0, 0.9333333333333333] [1.0, 0.9491525423728815] [(test, left), (test, right)] 0.286964 5 {'max_cost': 5}


Further, the optimized_pipeline_ parameter holds an instance of the pipeline initialized with the best parameter combination.

print("Best Para Combi:", gs.best_params_)
print("Paras of optimized Pipeline:", gs.optimized_pipeline_.get_params())
Best Para Combi: {'max_cost': 5}
Paras of optimized Pipeline: {'max_cost': 5}

To run the optmized pipeline, we can directly use the run/safe_run method on the GridSearch object. This makes it possible to use the GridSearch as a replacement for your pipeline object with minimal code changes.

If you would try to call run/safe_run (or score for that matter), before the optimization, an error is raised.

segmented_stride_list = gs.safe_run(dataset[0]).segmented_stride_list_
segmented_stride_list
start end
s_id
0 364 584
1 584 802
2 802 1023
3 1023 1242
4 1242 1458
5 1458 1672
6 1672 1887
7 1887 2104
8 2104 2327
9 2327 2546
10 2546 2773
11 2773 2998
12 2998 3231
13 3231 3453
14 3934 4163
15 4163 4382
16 4382 4603
17 4603 4822
18 4822 5043
19 5043 5267
20 5267 5489
21 5489 5713
22 5713 5936
23 5936 6167
24 6167 6395
25 6395 6628
26 6628 6858
27 6858 7091


Total running time of the script: ( 0 minutes 2.604 seconds)

Estimated memory usage: 12 MB

Gallery generated by Sphinx-Gallery