Simulai batching

Batching operations#

BatchwiseSampler#

Source code in simulai/batching.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class BatchwiseSampler:
    def __init__(
        self,
        dataset: h5py.Group = None,
        input_variables: List[str] = None,
        target_variables: List[str] = None,
        input_normalizer: callable = None,
        target_normalizer: callable = None,
        channels_first: bool = None,
    ) -> None:
        """Batchwise sampler for loading samples from disk and apply normalization if needed.

        Args:
            dataset (h5py.Group, optional): Dataset object containing the samples (Default value = None)
            input_variables (List[str], optional): List of input variables to be loaded (Default value = None)
            target_variables (List[str], optional): List of target variables to be loaded (Default value = None)
            input_normalizer (callable, optional): Function to be applied on the input variables (Default value = None)
            target_normalizer (callable, optional): Function to be applied on the target variables (Default value = None)
            channels_first (bool, optional): Whether the data should be in channels_first format or not. If not provided, will be set to None. (Default value = None)

        """

        # This import avoids circular importing
        from simulai.metrics import MinMaxEvaluation

        self.dataset = dataset
        self.input_variables = input_variables
        self.target_variables = target_variables

        self.input_normalizer = input_normalizer
        self.target_normalizer = target_normalizer

        self.channels_first = channels_first

        if self.channels_first:
            self.adjust_dimension = self._transpose_first_channel
        else:
            self.adjust_dimension = self._simple_stack

        self.minmax_eval = MinMaxEvaluation()

        # Defining if normalization will be used or not
        if self.input_normalizer is not None:
            self.exec_input_normalization = self._input_normalization
        else:
            self.exec_input_normalization = self._normalization_bypass

        if self.target_normalizer is not None:
            self.exec_target_normalization = self._target_normalization
        else:
            self.exec_target_normalization = self._normalization_bypass

    # Evaluating the global minimum and maximum  for all the
    # datasets in self.dataset
    def minmax(
        self, batch_size: int = None, data_interval: list = None
    ) -> Tuple[float, float]:
        """Evaluate the minimum and maximum values of all the target variables in the dataset.

        Args:
            batch_size (int, optional): Number of samples to use in the evaluation (Default value = None)
            data_interval (list, optional): List of 2 integers representing the starting and ending indexes of the interval in which the values will be evaluated. (Default value = None)

        Returns:
            A tuple of minimum and maximum value of the target variables.: 

        """
        min_list = []
        max_list = []

        for k in self.target_variables:
            min, max = self.minmax_eval(
                dataset=self.dataset[k],
                batch_size=batch_size,
                data_interval=data_interval,
            )
            min_list.append(min)
            max_list.append(max)

        return np.min(min_list), np.max(max_list)

    def input_shape(self) -> list:
        """Get the input shape of the dataset. The shape will be adjusted to put the channels dimension first
         if 'channels_first' is True.

        Returns:
            A list of integers representing the shape of the input variables.: 

        """
        if self.channels_first:
            shape_ = self.dataset[self.input_variables[0]].shape
            shape = (shape_[0],) + (len(self.input_variables),) + shape_[1:]
        else:
            shape = self.dataset[self.input_variables[0]].shape + (
                len(self.input_variables),
            )

        return list(shape)

    def _normalization_bypass(self, data: np.ndarray = None) -> np.ndarray:
        """Bypass the normalization.

        Args:
            data (np.ndarray, optional): The data to be bypassed. (Default value = None)

        Returns:
            Same data: 

        """
        return data

    def _target_normalization(self, data: np.ndarray = None) -> np.ndarray:
        """Normalize the target data using the provided normalizer.

        Args:
            data (np.ndarray, optional): The target data to be normalized. (Default value = None)

        Returns:
            Normalized target data.: 

        """
        return self.target_normalizer(data=data)

    def _input_normalization(self, data: np.ndarray = None) -> np.ndarray:
        """Normalize the input data using the provided normalizer.

        Args:
            data (np.ndarray, optional): The input data to be normalized. (Default value = None)

        Returns:
            Normalized input data.: 

        """
        return self.input_normalizer(data=data)

    def _transpose_first_channel(self, variables_list: list = None) -> torch.Tensor:
        """Transpose the first channel of the variables list.

        Args:
            variables_list (list, optional):  (Default value = None)

        """
        batch = np.stack(variables_list, axis=-1)

        dims = list(range(len(batch.shape)))
        dims_t = [0] + [dims[-1]] + dims[1:-1]

        batch = batch.transpose(*dims_t)

        return torch.from_numpy(batch.astype("float32"))

    def _simple_stack(self, variables_list: list = None) -> torch.Tensor:
        """Stack the variables list along the last axis.

        Args:
            variables_list (list, optional): The list of variables to be stacked. (Default value = None)

        Returns:
            A torch tensor of stacked variables.: 

        """
        batch = np.stack(variables_list, dim=-1)

        return torch.from_numpy(batch.astype("float32"))

    def input_data(self, indices: np.ndarray = None) -> torch.Tensor:
        """Retrieve the input data for the given indices, apply normalization and adjust the dimension

        Args:
            indices (np.ndarray, optional): The indices of samples for which the input data should be retrieved (Default value = None)

        Returns:
            A torch tensor of input data: 

        """
        indices = np.sort(indices)

        variables_arr = [self.dataset[i][indices] for i in self.input_variables]

        return self.exec_input_normalization(
            self.adjust_dimension(variables_list=variables_arr)
        )

    def target_data(self, indices: np.ndarray = None) -> torch.Tensor:
        """Retrieve the target data for the given indices, apply normalization and adjust the dimension

        Args:
            indices (np.ndarray, optional): The indices of samples for which the target data should be retrieved (Default value = None)

        Returns:
            A torch tensor of target data: 

        """

        indices = np.sort(indices)

        variables_arr = [
            torch.from_numpy(self.dataset[i][indices].astype("float32"))
            for i in self.target_variables
        ]

        return self.exec_target_normalization(
            self.adjust_dimension(variables_list=variables_arr)
        )

__init__(dataset=None, input_variables=None, target_variables=None, input_normalizer=None, target_normalizer=None, channels_first=None) #

Batchwise sampler for loading samples from disk and apply normalization if needed.

Parameters:

Name Type Description Default
dataset Group

Dataset object containing the samples (Default value = None)

None
input_variables List[str]

List of input variables to be loaded (Default value = None)

None
target_variables List[str]

List of target variables to be loaded (Default value = None)

None
input_normalizer callable

Function to be applied on the input variables (Default value = None)

None
target_normalizer callable

Function to be applied on the target variables (Default value = None)

None
channels_first bool

Whether the data should be in channels_first format or not. If not provided, will be set to None. (Default value = None)

None
Source code in simulai/batching.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    dataset: h5py.Group = None,
    input_variables: List[str] = None,
    target_variables: List[str] = None,
    input_normalizer: callable = None,
    target_normalizer: callable = None,
    channels_first: bool = None,
) -> None:
    """Batchwise sampler for loading samples from disk and apply normalization if needed.

    Args:
        dataset (h5py.Group, optional): Dataset object containing the samples (Default value = None)
        input_variables (List[str], optional): List of input variables to be loaded (Default value = None)
        target_variables (List[str], optional): List of target variables to be loaded (Default value = None)
        input_normalizer (callable, optional): Function to be applied on the input variables (Default value = None)
        target_normalizer (callable, optional): Function to be applied on the target variables (Default value = None)
        channels_first (bool, optional): Whether the data should be in channels_first format or not. If not provided, will be set to None. (Default value = None)

    """

    # This import avoids circular importing
    from simulai.metrics import MinMaxEvaluation

    self.dataset = dataset
    self.input_variables = input_variables
    self.target_variables = target_variables

    self.input_normalizer = input_normalizer
    self.target_normalizer = target_normalizer

    self.channels_first = channels_first

    if self.channels_first:
        self.adjust_dimension = self._transpose_first_channel
    else:
        self.adjust_dimension = self._simple_stack

    self.minmax_eval = MinMaxEvaluation()

    # Defining if normalization will be used or not
    if self.input_normalizer is not None:
        self.exec_input_normalization = self._input_normalization
    else:
        self.exec_input_normalization = self._normalization_bypass

    if self.target_normalizer is not None:
        self.exec_target_normalization = self._target_normalization
    else:
        self.exec_target_normalization = self._normalization_bypass

input_data(indices=None) #

Retrieve the input data for the given indices, apply normalization and adjust the dimension

Parameters:

Name Type Description Default
indices ndarray

The indices of samples for which the input data should be retrieved (Default value = None)

None

Returns:

Type Description
Tensor

A torch tensor of input data:

Source code in simulai/batching.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def input_data(self, indices: np.ndarray = None) -> torch.Tensor:
    """Retrieve the input data for the given indices, apply normalization and adjust the dimension

    Args:
        indices (np.ndarray, optional): The indices of samples for which the input data should be retrieved (Default value = None)

    Returns:
        A torch tensor of input data: 

    """
    indices = np.sort(indices)

    variables_arr = [self.dataset[i][indices] for i in self.input_variables]

    return self.exec_input_normalization(
        self.adjust_dimension(variables_list=variables_arr)
    )

input_shape() #

Get the input shape of the dataset. The shape will be adjusted to put the channels dimension first if 'channels_first' is True.

Returns:

Type Description
list

A list of integers representing the shape of the input variables.:

Source code in simulai/batching.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def input_shape(self) -> list:
    """Get the input shape of the dataset. The shape will be adjusted to put the channels dimension first
     if 'channels_first' is True.

    Returns:
        A list of integers representing the shape of the input variables.: 

    """
    if self.channels_first:
        shape_ = self.dataset[self.input_variables[0]].shape
        shape = (shape_[0],) + (len(self.input_variables),) + shape_[1:]
    else:
        shape = self.dataset[self.input_variables[0]].shape + (
            len(self.input_variables),
        )

    return list(shape)

minmax(batch_size=None, data_interval=None) #

Evaluate the minimum and maximum values of all the target variables in the dataset.

Parameters:

Name Type Description Default
batch_size int

Number of samples to use in the evaluation (Default value = None)

None
data_interval list

List of 2 integers representing the starting and ending indexes of the interval in which the values will be evaluated. (Default value = None)

None

Returns:

Type Description
Tuple[float, float]

A tuple of minimum and maximum value of the target variables.:

Source code in simulai/batching.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def minmax(
    self, batch_size: int = None, data_interval: list = None
) -> Tuple[float, float]:
    """Evaluate the minimum and maximum values of all the target variables in the dataset.

    Args:
        batch_size (int, optional): Number of samples to use in the evaluation (Default value = None)
        data_interval (list, optional): List of 2 integers representing the starting and ending indexes of the interval in which the values will be evaluated. (Default value = None)

    Returns:
        A tuple of minimum and maximum value of the target variables.: 

    """
    min_list = []
    max_list = []

    for k in self.target_variables:
        min, max = self.minmax_eval(
            dataset=self.dataset[k],
            batch_size=batch_size,
            data_interval=data_interval,
        )
        min_list.append(min)
        max_list.append(max)

    return np.min(min_list), np.max(max_list)

target_data(indices=None) #

Retrieve the target data for the given indices, apply normalization and adjust the dimension

Parameters:

Name Type Description Default
indices ndarray

The indices of samples for which the target data should be retrieved (Default value = None)

None

Returns:

Type Description
Tensor

A torch tensor of target data:

Source code in simulai/batching.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def target_data(self, indices: np.ndarray = None) -> torch.Tensor:
    """Retrieve the target data for the given indices, apply normalization and adjust the dimension

    Args:
        indices (np.ndarray, optional): The indices of samples for which the target data should be retrieved (Default value = None)

    Returns:
        A torch tensor of target data: 

    """

    indices = np.sort(indices)

    variables_arr = [
        torch.from_numpy(self.dataset[i][indices].astype("float32"))
        for i in self.target_variables
    ]

    return self.exec_target_normalization(
        self.adjust_dimension(variables_list=variables_arr)
    )

batchdomain_constructor#

Create a list of indices of the input data in the form of batches, using either an interval or a list of indices.

Parameters:

Name Type Description Default
data_interval list

A list of two integers representing the start and end of the data interval. (Default value = None)

None
batch_size int

The desired size of the batches (Default value = None)

None
batch_indices list

A list of indices to be divided into batches. (Default value = None)

None

Returns:

Type Description
list

A list of lists containing the indices of the input data in the form of batches.:

Source code in simulai/batching.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def batchdomain_constructor(
    data_interval: list = None, batch_size: int = None, batch_indices: list = None
) -> list:
    """Create a list of indices of the input data in the form of batches, using either an interval or a list of indices.

    Args:
        data_interval (list, optional): A list of two integers representing the start and end of the data interval. (Default value = None)
        batch_size (int, optional): The desired size of the batches (Default value = None)
        batch_indices (list, optional): A list of indices to be divided into batches. (Default value = None)

    Returns:
        A list of lists containing the indices of the input data in the form of batches.: 

    """

    if data_interval is not None:
        interval_size = data_interval[1] - data_interval[0]
        interval = data_interval
    elif batch_indices is not None:
        interval_size = len(batch_indices)
        interval = [batch_indices[0], batch_indices[-1]]
    else:
        raise Exception("Either data_interval or batch_indices must be provided.")

    if data_interval is not None:
        if interval_size < batch_size:
            batches_ = [interval[0], interval[1]]
            batches_ = np.array(batches_)
        else:
            # divides data_interval in the maximum amount of pieces such that the individual batches >= batch_size
            # and the batch_sizes differ at maximum by 1 in size

            n_batches = floor(interval_size / batch_size)
            residual = interval_size % batch_size
            batch_size_plus = floor(residual / n_batches)
            batch_size_plus_residual = residual % n_batches

            batch_size_up = batch_size + batch_size_plus

            batches_ = (
                [interval[0]]
                + [batch_size_up + 1] * batch_size_plus_residual
                + [batch_size_up] * (n_batches - batch_size_plus_residual)
            )
            batches_ = np.cumsum(batches_)

        batches = [batches_[i : i + 2] for i in range(batches_.shape[0] - 1)]
    else:
        if interval_size < batch_size:
            batches_ = batch_indices
            batches_ = np.array(batches_)
        else:
            # divides data_interval in the maximum amount of pieces such that the individual batches >= batch_size
            # and the batch_sizes differ at maximum by 1 in size

            n_batches = floor(interval_size / batch_size)
            batches_ = np.array_split(batch_indices, n_batches, axis=0)

        batches = [item.tolist() for item in batches_]

    return batches