Skip to content

dualhistagg

DualHistAgg

Bases: Aggregator

Aggregator: Dual History Aggregation (DualHistAgg) Authors: Enrique et al. Year: 2024

Source code in nebula/core/aggregation/dualhistagg.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class DualHistAgg(Aggregator):
    """
    Aggregator: Dual History Aggregation (DualHistAgg)
    Authors: Enrique et al.
    Year: 2024
    """

    def __init__(self, config=None, **kwargs):
        super().__init__(config, **kwargs)

    def softmax(self, x):
        # Safeguard against empty input array
        if x.size == 0:
            return np.array([])
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum(axis=0)  # ensure division is done correctly

    def run_aggregation(self, models, reference_model=None):
        if len(models) == 0:
            logging.error("Trying to aggregate models when there are no models")
            return None, None

        models = list(models.values())
        num_models = len(models)
        logging.info(f"Number of models: {num_models}")

        if num_models == 1:
            logging.info("Only one model, returning it")
            return models[0][0], models[0][0]

        # Total Samples
        total_samples = float(sum(w for _, w in models))
        # Create a Zero Model
        accum = {
            layer: torch.zeros_like(param).float() for layer, param in models[0][0].items()
        }  # use first model for template
        accum_similarity = accum.copy()

        similarities = (
            [cosine_metric(model, reference_model) for model, _ in models] if reference_model else [1] * num_models
        )

        logging.info(f"Similarities: {similarities}")
        weights = self.softmax(np.array(similarities))
        logging.info(f"Weights: {weights}")

        # Aggregation process
        for (model, _), weight, sim_weight in zip(models, weights, similarities, strict=False):
            for layer in accum:
                accum[layer] += model[layer].float() * float(weight)
                accum_similarity[layer] += model[layer].float() * float(sim_weight)

        # Normalize aggregated models
        for layer in accum:
            accum[layer] /= total_samples
            accum_similarity[layer] /= total_samples

        return accum, accum_similarity