Simulai optimizer

Optimization Interfaces#

Optimizer#

Source code in simulai/optimization/_optimization.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
class Optimizer:
    def __init__(
        self,
        optimizer: str = None,
        early_stopping: bool = False,
        summary_writer: bool = False,
        shuffle: bool = True,
        lr_decay_scheduler_params: dict = None,
        params: dict = None,
        early_stopping_params: dict = None,
        checkpoint_params: dict = None,
    ) -> None:
        """

        Args:
            optimizer (str): A name for a PyTorch optimizer.
            early_stopping (bool): Early-stopping will be used or not. 
            summary_writer (bool): Write a Tensorboard run file or not.
            shuffle (bool): Shuffle the dataset or not. 
            lr_decay_scheduler_params (dict): The parameters used for defining 
                a learning rate decay scheme.
            params (dict): Extra parameters which provide information for task-specific
                problems (as Physics-Informed neural networks).
            early_stopping_params (dict): Parameters required by the early-stopping scheme.
            checkpoint_params (dict): Parameters for configuring the checkpointing scheme.

        """

        if "n_samples" in list(params.keys()):
            self.n_samples = params.pop("n_samples")
        else:
            self.n_samples = None

        self.optimizer = optimizer
        self.params = params

        self.early_stopping = early_stopping
        self.early_stopping_params = early_stopping_params
        self.checkpoint_params = checkpoint_params

        self.summary_writer = summary_writer

        self.shuffle = shuffle

        self.lr_decay_scheduler_params = lr_decay_scheduler_params
        self.lr_decay_scheduler = None

        self.optim_module_names = [
            "torch.optim",
            "simulai.optimization._builtin_pytorch",
        ]

        self.input_data_name = "input_data"
        self.optim_modules = [
            importlib.import_module(module) for module in self.optim_module_names
        ]
        self.optim_class = self._get_optimizer(optimizer=optimizer)
        self.get_data = self._get_vector_data

        self.losses_module = importlib.import_module("simulai.optimization")

        # Using early_stopping or not
        if self.early_stopping is True:
            self.stop_handler = self._early_stopping_handler

        else:
            self.stop_handler = self._bypass_stop_handler

        # Using summary writing (necessary for tensorboard), or not
        if self.summary_writer is True:
            try:
                from torch.utils.tensorboard import SummaryWriter
            except:
                raise Exception(
                    "It is necessary to have tensorboard installed to use summary writing."
                )
            self.writer = SummaryWriter()
            self.summary_writer = self._summary_writer
        else:
            self.summary_writer = self._bypass_summary_writer

        # Determining the kind of sampling will be executed
        if self.shuffle:
            self.sampler = self._exec_shuffling

        else:
            self.sampler = self._no_shuffling

        # Using lr decay or not
        if self.lr_decay_scheduler_params is not None:
            self.lr_decay_handler = self._lr_decay_handler

        else:
            self.lr_decay_handler = self._bypass_lr_decay_handler

        # Using checkpoint or not
        if self.checkpoint_params is not None:
            if "checkpoint_frequency" in self.checkpoint_params.keys():
                self.checkpoint_frequency = self.checkpoint_params.pop(
                    "checkpoint_frequency"
                )
            else:
                raise Exception(
                    "Checkpoint frequency not defined. Please give a value for it."
                )

            self.checkpoint_handler = self._checkpoint_handler

        else:
            self.checkpoint_params = dict()
            self.checkpoint_handler = self._bypass_checkpoint_handler

        # When checkpoints are used, it is possible to overwrite them or
        # creating multiple checkpoints in different states
        overwrite_savepoint = lambda epoch: ""
        not_overwrite_savepoint = lambda epoch: f"_ckp_epoch_{epoch}"

        # Rules for overwritting or not checkpoints
        if "overwrite" in self.checkpoint_params.keys():
            overwrite = self.checkpoint_params.pop("overwrite")

            if overwrite == True:
                self.overwrite_rule = overwrite_savepoint
            else:
                self.overwrite_rule = not_overwrite_savepoint
        else:
            self.overwrite_rule = overwrite_savepoint

        self.validation_score = np.inf
        self.awaited_steps = 0
        self.accuracy_str = ""
        self.decay_frequency = None
        self.loss_states = None
        self.is_physics_informed = False

    def _verify_GPU_memory_availability(self, device: str = None):
        total = torch.cuda.get_device_properties(device).total_memory
        reserved = torch.cuda.memory_reserved(device)
        allocated = torch.cuda.memory_allocated(device)

        return total - reserved - allocated

    def _try_to_transfer_to_GPU(
        self, data: Union[dict, torch.Tensor], device: str = None
    ) -> None:
        available_GPU_memory = self._verify_GPU_memory_availability(device=device)

        if isinstance(data, dict):
            data_size = sum([t.element_size() * t.nelement() for t in data.values()])

            if data_size < available_GPU_memory:
                data_ = {k: t.to(device) for k, t in data.items()}
                print("Data transferred to GPU.")
                return data_
            else:
                print("It was not possible to move data to GPU: insufficient memory.")
                print(f"{available_GPU_memory} < {data_size}, in bytes")
                return data

        elif isinstance(data, torch.Tensor):
            data_size = data.element_size() * data.nelement()

            if data_size < available_GPU_memory:
                data_ = data.to(device)
                print("Data transferred to GPU.")
                return data_
            else:
                print("It was not possible to move data to GPU: insufficient memory.")
                print(f"{available_GPU_memory} < {data_size}, in bytes")
                return data
        else:
            return data

    def _seek_by_extra_trainable_parameters(
        self, residual: SymbolicOperator = None
    ) -> Union[list, None]:
        if hasattr(residual, "constants"):
            extra_parameters = [
                c
                for c in residual.trainable_parameters.values()
                if isinstance(c, Parameter)
            ]
            if extra_parameters:
                print("There are extra trainable parameters.")
            return extra_parameters
        else:
            return None

    def _get_lr_decay(self) -> Union[callable, None]:
        if self.lr_decay_scheduler_params is not None:
            name = self.lr_decay_scheduler_params.pop("name")
            try:
                self.decay_frequency = self.lr_decay_scheduler_params.pop("decay_frequency")
            except:
                pass
            lr_class = getattr(torch.optim.lr_scheduler, name)

            return lr_class

        else:
            return None

    def _exec_shuffling(self, size: int = None) -> torch.Tensor:
        return torch.randperm(size)

    def _summary_writer(self, loss_states: dict = None, epoch: int = None) -> None:
        for k, v in loss_states.items():
            loss = v[epoch]
            self.writer.add_scalar(k, loss, epoch)

    # It handles early-stopping for the optimization loop
    def _early_stopping_handler(self, val_loss_function: callable = None) -> None:
        loss = val_loss_function()
        self.accuracy_str = "acc: {}".format(loss)

        if loss < self.validation_score:
            self.validation_score = loss
            self.awaited_steps = 0
            return False

        elif (loss > self.validation_score) and (
            self.awaited_steps <= self.early_stopping_params["patience"]
        ):
            self.validation_score = loss
            self.awaited_steps += 1
            return False

        else:
            print("Early-stopping was actioned.")
            return True

    def _lr_decay_handler(self, epoch: int = None):
        if (epoch % self.decay_frequency == 0) and (epoch > 0):
            self.lr_decay_scheduler.step()

    def _checkpoint_handler(
        self,
        save_dir: str = None,
        name: str = None,
        model: NetworkTemplate = None,
        template: callable = None,
        compact: bool = False,
        epoch: int = None,
    ) -> None:
        if epoch % self.checkpoint_frequency == 0:
            tag = self.overwrite_rule(epoch)
            saver = SPFile(compact=compact)
            saver.write(
                save_dir=save_dir, name=name + tag, model=model, template=template
            )

    def _no_shuffling(self, size: int = None) -> torch.Tensor:
        return torch.arange(size)

    def _bypass_summary_writer(self, **kwargs) -> None:
        pass

    # Doing nothing to early-stopping
    def _bypass_stop_handler(self, **kwargs):
        return False

    # Doing nothing with lr
    def _bypass_lr_decay_handler(self, **kwargs):
        pass

    # Doing nothing to checkpoint
    def _bypass_checkpoint_handler(self, **kwargs):
        pass

    # When data is a NumPy array
    def _get_vector_data(
        self,
        dataset: Union[np.ndarray, torch.Tensor] = None,
        indices: np.ndarray = None,
    ) -> torch.Tensor:
        if dataset is None:
            return None
        elif isinstance(dataset, Dataset):
            return dataset()[indices]
        else:
            return dataset[indices]

    # When data is stored in a HDF5 dataset
    def _get_ondisk_data(
        self, dataset: callable = None, indices: np.ndarray = None
    ) -> torch.Tensor:
        indices = np.sort(indices)

        ondisk_formats = {np.ndarray: self._convert_ondisk_data_array,
                         dict: self._convert_ondisk_data_dict}

        data = dataset(indices=indices)

        return ondisk_formats.get(type(data))(data=data)

    def _convert_ondisk_data_array(
        self, data: np.ndarray=None,
    ) -> torch.Tensor:

        return torch.from_numpy(data.astype(ARRAY_DTYPE))

    def _convert_ondisk_data_dict(
        self, data: np.ndarray=None,
    ) -> torch.Tensor:

        return {key: torch.from_numpy((value).astype(ARRAY_DTYPE)) for  key, value in data.items()}

    # Preparing the batches (converting format and moving to the correct device)
    # in a single batch optimization loop
    def _make_input_data(
        self, input_data: Union[dict, torch.Tensor], device="cpu"
    ) -> dict:
        if type(input_data) is dict:
            input_data_dict = {key: item.to(device) for key, item in input_data.items()}
        else:
            input_data_dict = {self.input_data_name: input_data.to(device)}

        return input_data_dict

    # Preparing the batches (converting format and moving to the correct device)
    def _batchwise_make_input_data(
        self,
        input_data: Union[dict, torch.Tensor],
        device="cpu",
        batch_indices: torch.Tensor = None,
    ) -> dict:
        if type(input_data) is dict:
            input_data_dict = {
                key: self.get_data(dataset=item, indices=batch_indices).to(device)
                for key, item in input_data.items()
            }

        # When the 'input data' is just a pointer for a lazzy dataset
        elif callable(input_data):

            data = self.get_data(
                    dataset=input_data, indices=batch_indices
                )
            if type(data) == torch.Tensor:

                input_data_dict = {
                    self.input_data_name: data.to(device)
                }

            else:
                input_data_dict = {
                    key: item.to(device)
                    for key, item in data.items()
                }

        # The rest of the possible cases
        else:
            input_data_dict = {
                self.input_data_name: self.get_data(
                    dataset=input_data, indices=batch_indices
                ).to(device)
            }

        return input_data_dict

    # Getting up optimizer from the supported engines
    def _get_optimizer(self, optimizer: str = None) -> torch.nn.Module:
        try:
            for optim_module in self.optim_modules:
                mod_items = dir(optim_module)
                mod_items_lower = [item.lower() for item in mod_items]

                if optimizer in mod_items_lower:
                    print(f"Optimizer {optimizer} found in {optim_module.__name__}.")
                    optimizer_name = mod_items[mod_items_lower.index(optimizer)]

                    return getattr(optim_module, optimizer_name)

                else:
                    print(
                        f"Optimizer {optimizer} not found in {optim_module.__name__}."
                    )
        except:
            raise Exception(
                f"There is no correspondent to {optimizer} in any known optimization module."
            )

    # Getting up loss function from the correspondent module
    def _get_loss(self, loss: str = None) -> callable:
        if type(loss) == str:
            name = loss.upper()
            return getattr(self.losses_module, name + "Loss")
        elif callable(loss):
            return loss
        else:
            return f"loss must be str or callable, but received {type(loss)}"

    # Single batch optimization loop
    def _optimization_loop(
        self,
        n_epochs: int = None,
        loss_function: callable = None,
        op: NetworkTemplate = None,
        loss_states: dict = None,
        validation_loss_function: callable = None,
    ) -> None:
        for epoch in range(n_epochs):
            self.optimizer_instance.zero_grad()
            self.optimizer_instance.step(loss_function)

            self.checkpoint_handler(model=op, epoch=epoch, **self.checkpoint_params)

            self.summary_writer(loss_states=loss_states, epoch=epoch)

            self.lr_decay_handler(epoch=epoch)

        self.loss_states = loss_states

    # Basic version of the mini-batch optimization loop
    # TODO It could be parallelized
    def _batchwise_optimization_loop(
        self,
        n_epochs: int = None,
        batch_size: int = None,
        loss: Union[str, type] = None,
        op: NetworkTemplate = None,
        input_data: torch.Tensor = None,
        target_data: torch.Tensor = None,
        validation_data: Tuple[torch.Tensor] = None,
        params: dict = None,
        device: str = "cpu",
    ) -> None:
        print("Executing batchwise optimization loop.")

        if isinstance(loss, str):
            loss_class = self._get_loss(loss=loss)
            loss_instance = loss_class(operator=op)
        else:
            assert isinstance(
                loss, type
            ), "The object provided is not a LossBasics object."
            loss_class = loss

            try:
                loss_instance = loss_class(operator=op)
            except:
                raise Exception(f"It was not possible to instantiate the class {loss}.")

        if validation_data is not None:
            validation_input_data, validation_target_data = validation_data
            validation_input_data = self._make_input_data(
                validation_input_data, device=device
            )
            validation_target_data = validation_target_data.to(device)

            val_loss_function = loss_instance(
                input_data=validation_input_data,
                target_data=validation_target_data,
                **params,
            )
        else:
            val_loss_function = None

        batches = np.array_split(
            np.arange(self.n_samples), int(self.n_samples / batch_size)
        )

        # Number of batchwise optimization epochs
        n_batch_epochs = len(batches)

        epoch = 0  # Outer loop iteration
        b_epoch = 0  # Total iteration
        stop_criterion = False

        # When using mini-batches, it is necessary to
        # determine the number of iterations for the outer optimization
        # loop
        if n_batch_epochs > n_epochs:
            n_epochs_global = 1
        else:
            n_epochs_global = int(math.ceil(n_epochs / n_batch_epochs))

        while epoch < n_epochs_global and stop_criterion == False:
            # For each batch-wise realization it is possible to determine a
            # new permutation for the samples
            samples_permutation = self.sampler(size=self.n_samples)

            for ibatch in batches:
                self.optimizer_instance.zero_grad()

                # Selecting a batch from the permutation to perform a
                # single optimization step
                indices = samples_permutation[ibatch]

                # The input batch usually requires more pre-processing and 
                # specifications
                input_batch = self._batchwise_make_input_data(
                    input_data, device=device, batch_indices=indices
                )

                target_batch = self.get_data(dataset=target_data, indices=indices)

                if target_batch is not None:
                    target_batch = target_batch.to(device)

                # Instantiating the loss function
                loss_function = loss_instance(
                    input_data=input_batch,
                    target_data=target_batch,
                    call_back=self.accuracy_str,
                    **params,
                )

                # A single optimization step
                self.optimizer_instance.step(loss_function)

                # Writing the training information to a Tensorboard file 
                # (if it is required)
                self.summary_writer(
                    loss_states=loss_instance.loss_states, epoch=b_epoch
                )

                # Checkpoint the model 
                self.checkpoint_handler(
                    model=op, epoch=b_epoch, **self.checkpoint_params
                )

                # Updating the learning rate
                self.lr_decay_handler(epoch=b_epoch)

                # Early-stopping when necessary
                stop_criterion = self.stop_handler(val_loss_function=val_loss_function)

                b_epoch += 1

            epoch += 1

        if hasattr(loss_instance, "loss_states"):
            if all(
                [isinstance(item, list) for item in loss_instance.loss_states.values()]
            ):
                self.loss_states = {
                    key: np.hstack(value)
                    for key, value in loss_instance.loss_states.items()
                }

            else:
                self.loss_states = loss_instance.loss_states

    # Main fit method
    @_convert_tensor_format
    def fit(
        self,
        op: NetworkTemplate = None,
        input_data: Union[dict, torch.Tensor, np.ndarray, callable] = None,
        target_data: Union[torch.Tensor, np.ndarray, callable] = None,
        validation_data: Tuple[Union[torch.Tensor, np.ndarray, callable]] = None,
        n_epochs: int = None,
        loss: str = "rmse",
        params: dict = None,
        batch_size: int = None,
        device: str = "cpu",
        distributed: bool = False,
        use_jit: bool = False,
    ) -> None:
        """

        Args:
            op (NetworkTemplate): The model which will be trained
            input_data (Union[dict, torch.Tensor, np.ndarray, callable]): The (or collection of) dataset(s) used as 
                input for the model. 
            target_data (Union[torch.Tensor, np.ndarray, callable]): The target data for the problem.
            validation_data (Tuple[Union[torch.Tensor, np.ndarray, callable]]): The validation data used for the problem
                (if required).
            n_epochs (int): Number of epochs for the optimization process. 
            loss (str): A string for referring some loss function defined on simulai/optimization/_losses.py.ndarray
            params (dict): Extra parameters required for task-specific problems (as Physics-informed neural networks).
            batch_size (int): The size of the batch used in each optimization epoch
            device (str): The device in which the optimization will run, 'cpu' or 'gpu'.
            distributed (bool): Use distributed (multi-node) training or not. 
            use_jit (bool): Use PyTorch JIT (Just in time compilation) or not.

        """

        # Verifying if the params dictionary contains Physics-informed
        # attributes
        extra_parameters = None
        if "residual" in params:
            self.is_physics_informed = True

            extra_parameters = self._seek_by_extra_trainable_parameters(
                residual=params["residual"]
            )

            if use_jit:
                try:
                    params["residual"] = torch.compile(params["residual"])
                except AttributeError:
                    pass
            else:
                pass

        _adjust_loss_function_to_model(
            model=op, loss=loss, physics_informed=self.is_physics_informed
        )

        # When using inputs with the format h5py.Dataset
        if callable(input_data) and callable(target_data):
            assert batch_size, (
                "When the input and target datasets are in disk, it is necessary to provide a "
                " value for batch_size."
            )

            self.get_data = self._get_ondisk_data
        else:
            pass

        # When target is None, it is expected a residual (Physics-Informed) training
        if target_data is None:
            assert "residual" in params, (
                "If target_data are not provided, residual must be != None "
                "in order to generate it."
            )

            assert callable(params["residual"]), (
                f"operator must be callable,"
                f" but received {type(params['operator'])}."
            )
        else:
            pass

        if "causality_preserving" in params.keys():
            assert self.shuffle == False, (
                "If the causality preserving algorithm is being used,"
                " no shuffling must be allowed when creating the mini-batches."
            )

        # When early-stopping is used, it is necessary to provide a validation dataset
        if self.early_stopping is True:
            assert validation_data is not None, (
                "If early-stopping is being used, it is necessary to provide a"
                "validation dataset via validation_data."
            )
        else:
            pass

        # Configuring the device to be used during the fitting process
        device_label = device
        if device == "gpu":
            if not torch.cuda.is_available():
                print("Warning: There is no GPU available, using CPU instead.")
                device = "cpu"
                device_label = "cpu"
            else:
                try:
                    device = "cuda:" + os.environ["LOCAL_RANK"]
                except KeyError:
                    device = "cuda"
                device_label = "gpu"
                print("Using GPU.")
        elif device == "cpu":
            print("Using CPU.")
        elif not device:
            device = "cpu"
            print("Received None, but using cpu instead.")
        else:
            raise Exception(
                f"The device must be cpu or gpu, the device {device} is not supported."
            )

        if not "device" in params:
            params["device"] = device

        # In a multi-device execution, the optimizer must be properly instantiated to execute distributed tasks.
        if distributed == True:
            from torch.distributed.optim import DistributedOptimizer
            from torch.distributed.rpc import RRef

            optimizer_params = list()
            for param in op.parameters():
                optimizer_params.append(RRef(param))

            if extra_parameters is not None:
                optimizer_params += extra_parameters

            self.optimizer_instance = DistributedOptimizer(
                self.optim_class, optimizer_params, **self.params
            )

        else:
            # Guaranteeing the correct operator placement when using a single device
            op = op.to(device)

            # Trying to use the PyTorch JIT compilation
            if use_jit:
                try:
                    op = torch.compile(op)
                except AttributeError:
                    pass

            if extra_parameters is not None:
                optimizer_params = list(op.parameters()) + extra_parameters
                self.optimizer_instance = self.optim_class(
                    optimizer_params, **self.params
                )
            else:
                self.optimizer_instance = self.optim_class(
                    op.parameters(), **self.params
                )

        # Configuring LR decay, when necessary
        lr_scheduler_class = self._get_lr_decay()

        if lr_scheduler_class is not None:
            print(f"Using LR decay {lr_scheduler_class}.")
            self.lr_decay_scheduler = lr_scheduler_class(
                self.optimizer_instance, **self.lr_decay_scheduler_params
            )
        else:
            pass

        # If GPU is being used, try to completely allocate the dataset there.
        if device_label == "gpu":
            input_data = self._try_to_transfer_to_GPU(data=input_data, device=device)
            target_data = self._try_to_transfer_to_GPU(data=target_data, device=device)

        else:
            pass

        # Determining the kind of execution to be performed, batch-wise or not
        if batch_size is not None:
            # Determining the number of samples for each case
            # dictionary
            if type(input_data) is dict:
                key = list(input_data.keys())[0]
                self.n_samples = input_data[key].size()[0]

            # When using h5py.Group, the number of samples must be informed in the instantiation
            elif callable(input_data):
                assert self.n_samples is not None, (
                    "If the dataset is on disk, it is necessary"
                    "to inform n_samples using the dictionary params."
                )

            # other cases: torch.Tensor, np.ndarray
            else:
                self.n_samples = input_data.size()[0]

            self._batchwise_optimization_loop(
                n_epochs=n_epochs,
                batch_size=batch_size,
                loss=loss,
                op=op,
                input_data=input_data,
                target_data=target_data,
                validation_data=validation_data,
                params=params,
                device=device,
            )

        else:
            # In this case, the entire datasets are placed in the same device, CPU or GPU
            # The datasets are initially located on CPU
            input_data = self._make_input_data(input_data, device=device)

            # Target data is optional for some cases
            if target_data is not None:
                target_data = target_data.to(device)

            loss_class = self._get_loss(loss=loss)
            loss_instance = loss_class(operator=op)

            # Instantiating the loss function
            loss_function = loss_instance(
                input_data=input_data, target_data=target_data, **params
            )

            # Instantiating the validation loss function, if necessary
            if self.early_stopping is True:
                validation_input_data, validation_target_data = validation_data
                validation_loss_function = loss_instance(
                    input_data=validation_input_data,
                    target_data=validation_target_data,
                    **params,
                )
            else:
                validation_loss_function = None

            # Executing the optimization loop
            self._optimization_loop(
                n_epochs=n_epochs,
                loss_function=loss_function,
                op=op,
                loss_states=loss_instance.loss_states,
                validation_loss_function=validation_loss_function,
            )

__init__(optimizer=None, early_stopping=False, summary_writer=False, shuffle=True, lr_decay_scheduler_params=None, params=None, early_stopping_params=None, checkpoint_params=None) #

Parameters:

Name Type Description Default
optimizer str

A name for a PyTorch optimizer.

None
early_stopping bool

Early-stopping will be used or not.

False
summary_writer bool

Write a Tensorboard run file or not.

False
shuffle bool

Shuffle the dataset or not.

True
lr_decay_scheduler_params dict

The parameters used for defining a learning rate decay scheme.

None
params dict

Extra parameters which provide information for task-specific problems (as Physics-Informed neural networks).

None
early_stopping_params dict

Parameters required by the early-stopping scheme.

None
checkpoint_params dict

Parameters for configuring the checkpointing scheme.

None
Source code in simulai/optimization/_optimization.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def __init__(
    self,
    optimizer: str = None,
    early_stopping: bool = False,
    summary_writer: bool = False,
    shuffle: bool = True,
    lr_decay_scheduler_params: dict = None,
    params: dict = None,
    early_stopping_params: dict = None,
    checkpoint_params: dict = None,
) -> None:
    """

    Args:
        optimizer (str): A name for a PyTorch optimizer.
        early_stopping (bool): Early-stopping will be used or not. 
        summary_writer (bool): Write a Tensorboard run file or not.
        shuffle (bool): Shuffle the dataset or not. 
        lr_decay_scheduler_params (dict): The parameters used for defining 
            a learning rate decay scheme.
        params (dict): Extra parameters which provide information for task-specific
            problems (as Physics-Informed neural networks).
        early_stopping_params (dict): Parameters required by the early-stopping scheme.
        checkpoint_params (dict): Parameters for configuring the checkpointing scheme.

    """

    if "n_samples" in list(params.keys()):
        self.n_samples = params.pop("n_samples")
    else:
        self.n_samples = None

    self.optimizer = optimizer
    self.params = params

    self.early_stopping = early_stopping
    self.early_stopping_params = early_stopping_params
    self.checkpoint_params = checkpoint_params

    self.summary_writer = summary_writer

    self.shuffle = shuffle

    self.lr_decay_scheduler_params = lr_decay_scheduler_params
    self.lr_decay_scheduler = None

    self.optim_module_names = [
        "torch.optim",
        "simulai.optimization._builtin_pytorch",
    ]

    self.input_data_name = "input_data"
    self.optim_modules = [
        importlib.import_module(module) for module in self.optim_module_names
    ]
    self.optim_class = self._get_optimizer(optimizer=optimizer)
    self.get_data = self._get_vector_data

    self.losses_module = importlib.import_module("simulai.optimization")

    # Using early_stopping or not
    if self.early_stopping is True:
        self.stop_handler = self._early_stopping_handler

    else:
        self.stop_handler = self._bypass_stop_handler

    # Using summary writing (necessary for tensorboard), or not
    if self.summary_writer is True:
        try:
            from torch.utils.tensorboard import SummaryWriter
        except:
            raise Exception(
                "It is necessary to have tensorboard installed to use summary writing."
            )
        self.writer = SummaryWriter()
        self.summary_writer = self._summary_writer
    else:
        self.summary_writer = self._bypass_summary_writer

    # Determining the kind of sampling will be executed
    if self.shuffle:
        self.sampler = self._exec_shuffling

    else:
        self.sampler = self._no_shuffling

    # Using lr decay or not
    if self.lr_decay_scheduler_params is not None:
        self.lr_decay_handler = self._lr_decay_handler

    else:
        self.lr_decay_handler = self._bypass_lr_decay_handler

    # Using checkpoint or not
    if self.checkpoint_params is not None:
        if "checkpoint_frequency" in self.checkpoint_params.keys():
            self.checkpoint_frequency = self.checkpoint_params.pop(
                "checkpoint_frequency"
            )
        else:
            raise Exception(
                "Checkpoint frequency not defined. Please give a value for it."
            )

        self.checkpoint_handler = self._checkpoint_handler

    else:
        self.checkpoint_params = dict()
        self.checkpoint_handler = self._bypass_checkpoint_handler

    # When checkpoints are used, it is possible to overwrite them or
    # creating multiple checkpoints in different states
    overwrite_savepoint = lambda epoch: ""
    not_overwrite_savepoint = lambda epoch: f"_ckp_epoch_{epoch}"

    # Rules for overwritting or not checkpoints
    if "overwrite" in self.checkpoint_params.keys():
        overwrite = self.checkpoint_params.pop("overwrite")

        if overwrite == True:
            self.overwrite_rule = overwrite_savepoint
        else:
            self.overwrite_rule = not_overwrite_savepoint
    else:
        self.overwrite_rule = overwrite_savepoint

    self.validation_score = np.inf
    self.awaited_steps = 0
    self.accuracy_str = ""
    self.decay_frequency = None
    self.loss_states = None
    self.is_physics_informed = False

fit(op=None, input_data=None, target_data=None, validation_data=None, n_epochs=None, loss='rmse', params=None, batch_size=None, device='cpu', distributed=False, use_jit=False) #

Parameters:

Name Type Description Default
op NetworkTemplate

The model which will be trained

None
input_data Union[dict, Tensor, ndarray, callable]

The (or collection of) dataset(s) used as input for the model.

None
target_data Union[Tensor, ndarray, callable]

The target data for the problem.

None
validation_data Tuple[Union[Tensor, ndarray, callable]]

The validation data used for the problem (if required).

None
n_epochs int

Number of epochs for the optimization process.

None
loss str

A string for referring some loss function defined on simulai/optimization/_losses.py.ndarray

'rmse'
params dict

Extra parameters required for task-specific problems (as Physics-informed neural networks).

None
batch_size int

The size of the batch used in each optimization epoch

None
device str

The device in which the optimization will run, 'cpu' or 'gpu'.

'cpu'
distributed bool

Use distributed (multi-node) training or not.

False
use_jit bool

Use PyTorch JIT (Just in time compilation) or not.

False
Source code in simulai/optimization/_optimization.py
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
@_convert_tensor_format
def fit(
    self,
    op: NetworkTemplate = None,
    input_data: Union[dict, torch.Tensor, np.ndarray, callable] = None,
    target_data: Union[torch.Tensor, np.ndarray, callable] = None,
    validation_data: Tuple[Union[torch.Tensor, np.ndarray, callable]] = None,
    n_epochs: int = None,
    loss: str = "rmse",
    params: dict = None,
    batch_size: int = None,
    device: str = "cpu",
    distributed: bool = False,
    use_jit: bool = False,
) -> None:
    """

    Args:
        op (NetworkTemplate): The model which will be trained
        input_data (Union[dict, torch.Tensor, np.ndarray, callable]): The (or collection of) dataset(s) used as 
            input for the model. 
        target_data (Union[torch.Tensor, np.ndarray, callable]): The target data for the problem.
        validation_data (Tuple[Union[torch.Tensor, np.ndarray, callable]]): The validation data used for the problem
            (if required).
        n_epochs (int): Number of epochs for the optimization process. 
        loss (str): A string for referring some loss function defined on simulai/optimization/_losses.py.ndarray
        params (dict): Extra parameters required for task-specific problems (as Physics-informed neural networks).
        batch_size (int): The size of the batch used in each optimization epoch
        device (str): The device in which the optimization will run, 'cpu' or 'gpu'.
        distributed (bool): Use distributed (multi-node) training or not. 
        use_jit (bool): Use PyTorch JIT (Just in time compilation) or not.

    """

    # Verifying if the params dictionary contains Physics-informed
    # attributes
    extra_parameters = None
    if "residual" in params:
        self.is_physics_informed = True

        extra_parameters = self._seek_by_extra_trainable_parameters(
            residual=params["residual"]
        )

        if use_jit:
            try:
                params["residual"] = torch.compile(params["residual"])
            except AttributeError:
                pass
        else:
            pass

    _adjust_loss_function_to_model(
        model=op, loss=loss, physics_informed=self.is_physics_informed
    )

    # When using inputs with the format h5py.Dataset
    if callable(input_data) and callable(target_data):
        assert batch_size, (
            "When the input and target datasets are in disk, it is necessary to provide a "
            " value for batch_size."
        )

        self.get_data = self._get_ondisk_data
    else:
        pass

    # When target is None, it is expected a residual (Physics-Informed) training
    if target_data is None:
        assert "residual" in params, (
            "If target_data are not provided, residual must be != None "
            "in order to generate it."
        )

        assert callable(params["residual"]), (
            f"operator must be callable,"
            f" but received {type(params['operator'])}."
        )
    else:
        pass

    if "causality_preserving" in params.keys():
        assert self.shuffle == False, (
            "If the causality preserving algorithm is being used,"
            " no shuffling must be allowed when creating the mini-batches."
        )

    # When early-stopping is used, it is necessary to provide a validation dataset
    if self.early_stopping is True:
        assert validation_data is not None, (
            "If early-stopping is being used, it is necessary to provide a"
            "validation dataset via validation_data."
        )
    else:
        pass

    # Configuring the device to be used during the fitting process
    device_label = device
    if device == "gpu":
        if not torch.cuda.is_available():
            print("Warning: There is no GPU available, using CPU instead.")
            device = "cpu"
            device_label = "cpu"
        else:
            try:
                device = "cuda:" + os.environ["LOCAL_RANK"]
            except KeyError:
                device = "cuda"
            device_label = "gpu"
            print("Using GPU.")
    elif device == "cpu":
        print("Using CPU.")
    elif not device:
        device = "cpu"
        print("Received None, but using cpu instead.")
    else:
        raise Exception(
            f"The device must be cpu or gpu, the device {device} is not supported."
        )

    if not "device" in params:
        params["device"] = device

    # In a multi-device execution, the optimizer must be properly instantiated to execute distributed tasks.
    if distributed == True:
        from torch.distributed.optim import DistributedOptimizer
        from torch.distributed.rpc import RRef

        optimizer_params = list()
        for param in op.parameters():
            optimizer_params.append(RRef(param))

        if extra_parameters is not None:
            optimizer_params += extra_parameters

        self.optimizer_instance = DistributedOptimizer(
            self.optim_class, optimizer_params, **self.params
        )

    else:
        # Guaranteeing the correct operator placement when using a single device
        op = op.to(device)

        # Trying to use the PyTorch JIT compilation
        if use_jit:
            try:
                op = torch.compile(op)
            except AttributeError:
                pass

        if extra_parameters is not None:
            optimizer_params = list(op.parameters()) + extra_parameters
            self.optimizer_instance = self.optim_class(
                optimizer_params, **self.params
            )
        else:
            self.optimizer_instance = self.optim_class(
                op.parameters(), **self.params
            )

    # Configuring LR decay, when necessary
    lr_scheduler_class = self._get_lr_decay()

    if lr_scheduler_class is not None:
        print(f"Using LR decay {lr_scheduler_class}.")
        self.lr_decay_scheduler = lr_scheduler_class(
            self.optimizer_instance, **self.lr_decay_scheduler_params
        )
    else:
        pass

    # If GPU is being used, try to completely allocate the dataset there.
    if device_label == "gpu":
        input_data = self._try_to_transfer_to_GPU(data=input_data, device=device)
        target_data = self._try_to_transfer_to_GPU(data=target_data, device=device)

    else:
        pass

    # Determining the kind of execution to be performed, batch-wise or not
    if batch_size is not None:
        # Determining the number of samples for each case
        # dictionary
        if type(input_data) is dict:
            key = list(input_data.keys())[0]
            self.n_samples = input_data[key].size()[0]

        # When using h5py.Group, the number of samples must be informed in the instantiation
        elif callable(input_data):
            assert self.n_samples is not None, (
                "If the dataset is on disk, it is necessary"
                "to inform n_samples using the dictionary params."
            )

        # other cases: torch.Tensor, np.ndarray
        else:
            self.n_samples = input_data.size()[0]

        self._batchwise_optimization_loop(
            n_epochs=n_epochs,
            batch_size=batch_size,
            loss=loss,
            op=op,
            input_data=input_data,
            target_data=target_data,
            validation_data=validation_data,
            params=params,
            device=device,
        )

    else:
        # In this case, the entire datasets are placed in the same device, CPU or GPU
        # The datasets are initially located on CPU
        input_data = self._make_input_data(input_data, device=device)

        # Target data is optional for some cases
        if target_data is not None:
            target_data = target_data.to(device)

        loss_class = self._get_loss(loss=loss)
        loss_instance = loss_class(operator=op)

        # Instantiating the loss function
        loss_function = loss_instance(
            input_data=input_data, target_data=target_data, **params
        )

        # Instantiating the validation loss function, if necessary
        if self.early_stopping is True:
            validation_input_data, validation_target_data = validation_data
            validation_loss_function = loss_instance(
                input_data=validation_input_data,
                target_data=validation_target_data,
                **params,
            )
        else:
            validation_loss_function = None

        # Executing the optimization loop
        self._optimization_loop(
            n_epochs=n_epochs,
            loss_function=loss_function,
            op=op,
            loss_states=loss_instance.loss_states,
            validation_loss_function=validation_loss_function,
        )

ScipyInterface#

Source code in simulai/optimization/_optimization.py
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
class ScipyInterface:
    def __init__(
        self,
        fun: NetworkTemplate = None,
        optimizer: str = None,
        optimizer_config: dict = dict(),
        loss: callable = None,
        loss_config: dict = None,
        device: str = "cpu",
        jac: str = None,
    ) -> None:
        """An interface for using SciPy-defined optimization algorithms.

        Args:
            fun (NetworkTemplate): A model (neural network) to be trained.
            optimizer (str): A name for an optimizar available on SciPy.
            optimizer_config (dict): A configuration dictionary for the chosen optimizer.
            loss (callable): A loss function implemented in the form of a Python function or class.
            loss_config (dict): A configuration dictionary for the loss function.
            device (str): The device in which the optimization will be executed ('cpu' or 'gpu').
            jac (str): If necessary, define a method for evaluating the Jacobian available on SciPy.

        Raises:
            Exception: If a not recognized device is defined as 'device'.

        """
        # Configuring the device to be used during the fitting process
        device_label = device
        if device == "gpu":
            if not torch.cuda.is_available():
                print("Warning: There is no GPU available, using CPU instead.")
                device = "cpu"
                device_label = "cpu"
            else:
                try:
                    device = "cuda:" + os.environ["LOCAL_RANK"]
                except KeyError:
                    device = "cuda"
                device_label = "gpu"
                print("Using GPU.")
        elif device == "cpu":
            print("Using CPU.")
        else:
            raise Exception(f"The device must be cpu or gpu, but received: {device}")

        self.device = device
        self.engine = "scipy.optimize"
        self.engine_module = importlib.import_module(self.engine)
        self.minimization_method = "minimize"

        self.optimizer = getattr(self.engine_module, self.minimization_method)

        self.optimizer_config = optimizer_config or dict()
        self.optimizer_config["method"] = optimizer

        self.fun = fun
        self.loss = loss
        self.loss_config = loss_config or dict()

        self.operators_names = list(self.fun.state_dict().keys())

        self.operators_shapes = OrderedDict(
            {k: list(v.shape) for k, v in self.fun.state_dict().items()}
        )

        self.state_0 = self.fun.state_dict()

        intervals = np.cumsum(
            [0] + [np.prod(shape) for shape in self.operators_shapes.values()]
        )

        self.operators_intervals = [
            intervals[i : i + 2].tolist() for i in range(len(intervals) - 1)
        ]

        if jac:
            self.optimizer_config["jac"] = jac
            self.objective = self._fun_num
        else:
            self.optimizer_config["jac"] = True
            self.objective = self._fun

        # Determining default type
        if torch.get_default_dtype() == torch.float32:
            self.default_dtype = np.float32
        else:
            self.default_dtype = np.float64

    def _stack_and_convert_parameters(
        self, parameters: List[Union[torch.Tensor, np.ndarray]]
    ) -> np.ndarray:
        """
        It produces a stack of all the model parameters.

        Args:
            parameters (List[Union[torch.Tensor, np.ndarray]]): A list containing all the 
                model parameters in their original shapes.
        Returns:
           np.ndarray: A stack (single vertical array) of all the model parameters. 

        """
        return np.hstack(
            [
                param.detach().numpy().astype(np.float64).flatten()
                for param in parameters.values()
            ]
        )

    def _update_and_set_parameters(self, parameters: np.ndarray) -> None:
        """
        It updates the parameters with the new values estimated by the optimizer.
        Args:
            parameters (np.ndarray): The stack of all the model parameters.

        """
        operators = [
            torch.from_numpy(
                parameters[slice(*interval)].reshape(shape).astype(self.default_dtype)
            ).to(self.device)
            for interval, shape in zip(
                self.operators_intervals, self.operators_shapes.values()
            )
        ]

        for opi, parameter in enumerate(self.fun.parameters()):
            parameter.data.copy_(operators[opi])

    def _exec_kwargs_forward(self, input_data: dict = None):

        """It executes the forward pass for the model when it receives more than one input.
        Args:
            input_data dict: Data to be passed to the model. 

        """

        return self.fun.forward(**input_data)

    def _exec_forward(self, input_data: Union[np.ndarray, torch.Tensor] = None):

        """It executes the forward pass for the model.
        Args:
            input_data (Union[np.ndarray, torch.Tensor]): Data to be passed to the model. 

        """

        return self.fun.forward(input_data=input_data)

    def _fun_num(self, parameters: np.ndarray) -> Tuple[float]:
        """

        Args:
            parameters (np.ndarray): The stacked parameters defined for the model. 
        Returns:
            Tuple[float]: The loss(es) defined for the optimization process.              
        """

        self._update_and_set_parameters(parameters)

        closure = self.loss(self.input_data, self.target_data, **self.loss_config)
        loss = closure()

        return loss.detach().cpu().numpy().astype(np.float64)

    def _fun(self, parameters: np.ndarray) -> Tuple[float, np.ndarray]:
        """
        Args:
            parameters (np.ndarray): The stack of all the trainable parameters for the model.

        Returns:
            Tuple[float, np.ndarray]: A tuple containing the value for the loss function and 
            the array of gradients for the model parameters. 
        """

        # Setting the new values for the model parameters
        self._update_and_set_parameters(parameters)

        closure = self.loss(self.input_data, self.target_data, **self.loss_config)
        loss = closure()

        grads = [v.grad.detach().cpu().numpy() for v in self.fun.parameters()]

        gradients = np.hstack(
            [
                v.flatten()
                for v, shape in zip(grads, list(self.operators_shapes.values()))
            ]
        )

        return loss.detach().cpu().numpy().astype(np.float64), gradients.astype(
            np.float64
        )

    def fit(
        self,
        input_data: Union[dict, torch.Tensor, np.ndarray] = None,
        target_data: Union[torch.Tensor, np.ndarray] = None,
    ) -> None:
        """

        Args:
            input_data (Union[dict, torch.Tensor, np.ndarray]): The (or collection of) dataset(s) used as 
                input for the model. 
            target_data (Union[torch.Tensor, np.ndarray]): The target data used for training the model. 

        """
        parameters_0 = self._stack_and_convert_parameters(self.state_0)

        print(
            f"\nStarting ScipyInterface with method: {self.optimizer_config['method']}\n"
        )

        if isinstance(input_data, dict):
            self.exec_forward = self._exec_kwargs_forward
        else:
            self.exec_forward = self._exec_forward

        self.input_data = input_data

        self.target_data = target_data

        self.closure = self.loss(self.input_data, self.target_data, **self.loss_config)

        solution = self.optimizer(self.objective, parameters_0, **self.optimizer_config)

        self._update_and_set_parameters(solution.x)

__init__(fun=None, optimizer=None, optimizer_config=dict(), loss=None, loss_config=None, device='cpu', jac=None) #

An interface for using SciPy-defined optimization algorithms.

Parameters:

Name Type Description Default
fun NetworkTemplate

A model (neural network) to be trained.

None
optimizer str

A name for an optimizar available on SciPy.

None
optimizer_config dict

A configuration dictionary for the chosen optimizer.

dict()
loss callable

A loss function implemented in the form of a Python function or class.

None
loss_config dict

A configuration dictionary for the loss function.

None
device str

The device in which the optimization will be executed ('cpu' or 'gpu').

'cpu'
jac str

If necessary, define a method for evaluating the Jacobian available on SciPy.

None

Raises:

Type Description
Exception

If a not recognized device is defined as 'device'.

Source code in simulai/optimization/_optimization.py
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
def __init__(
    self,
    fun: NetworkTemplate = None,
    optimizer: str = None,
    optimizer_config: dict = dict(),
    loss: callable = None,
    loss_config: dict = None,
    device: str = "cpu",
    jac: str = None,
) -> None:
    """An interface for using SciPy-defined optimization algorithms.

    Args:
        fun (NetworkTemplate): A model (neural network) to be trained.
        optimizer (str): A name for an optimizar available on SciPy.
        optimizer_config (dict): A configuration dictionary for the chosen optimizer.
        loss (callable): A loss function implemented in the form of a Python function or class.
        loss_config (dict): A configuration dictionary for the loss function.
        device (str): The device in which the optimization will be executed ('cpu' or 'gpu').
        jac (str): If necessary, define a method for evaluating the Jacobian available on SciPy.

    Raises:
        Exception: If a not recognized device is defined as 'device'.

    """
    # Configuring the device to be used during the fitting process
    device_label = device
    if device == "gpu":
        if not torch.cuda.is_available():
            print("Warning: There is no GPU available, using CPU instead.")
            device = "cpu"
            device_label = "cpu"
        else:
            try:
                device = "cuda:" + os.environ["LOCAL_RANK"]
            except KeyError:
                device = "cuda"
            device_label = "gpu"
            print("Using GPU.")
    elif device == "cpu":
        print("Using CPU.")
    else:
        raise Exception(f"The device must be cpu or gpu, but received: {device}")

    self.device = device
    self.engine = "scipy.optimize"
    self.engine_module = importlib.import_module(self.engine)
    self.minimization_method = "minimize"

    self.optimizer = getattr(self.engine_module, self.minimization_method)

    self.optimizer_config = optimizer_config or dict()
    self.optimizer_config["method"] = optimizer

    self.fun = fun
    self.loss = loss
    self.loss_config = loss_config or dict()

    self.operators_names = list(self.fun.state_dict().keys())

    self.operators_shapes = OrderedDict(
        {k: list(v.shape) for k, v in self.fun.state_dict().items()}
    )

    self.state_0 = self.fun.state_dict()

    intervals = np.cumsum(
        [0] + [np.prod(shape) for shape in self.operators_shapes.values()]
    )

    self.operators_intervals = [
        intervals[i : i + 2].tolist() for i in range(len(intervals) - 1)
    ]

    if jac:
        self.optimizer_config["jac"] = jac
        self.objective = self._fun_num
    else:
        self.optimizer_config["jac"] = True
        self.objective = self._fun

    # Determining default type
    if torch.get_default_dtype() == torch.float32:
        self.default_dtype = np.float32
    else:
        self.default_dtype = np.float64

fit(input_data=None, target_data=None) #

Parameters:

Name Type Description Default
input_data Union[dict, Tensor, ndarray]

The (or collection of) dataset(s) used as input for the model.

None
target_data Union[Tensor, ndarray]

The target data used for training the model.

None
Source code in simulai/optimization/_optimization.py
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
def fit(
    self,
    input_data: Union[dict, torch.Tensor, np.ndarray] = None,
    target_data: Union[torch.Tensor, np.ndarray] = None,
) -> None:
    """

    Args:
        input_data (Union[dict, torch.Tensor, np.ndarray]): The (or collection of) dataset(s) used as 
            input for the model. 
        target_data (Union[torch.Tensor, np.ndarray]): The target data used for training the model. 

    """
    parameters_0 = self._stack_and_convert_parameters(self.state_0)

    print(
        f"\nStarting ScipyInterface with method: {self.optimizer_config['method']}\n"
    )

    if isinstance(input_data, dict):
        self.exec_forward = self._exec_kwargs_forward
    else:
        self.exec_forward = self._exec_forward

    self.input_data = input_data

    self.target_data = target_data

    self.closure = self.loss(self.input_data, self.target_data, **self.loss_config)

    solution = self.optimizer(self.objective, parameters_0, **self.optimizer_config)

    self._update_and_set_parameters(solution.x)