Skip to content

Documentation for Updatehandler Module

UpdateHandler

Bases: ABC

Abstract base class for managing update storage and retrieval in a federated learning setting.

This class defines the required methods for handling updates from multiple sources, ensuring they are properly stored, retrieved, and processed during the aggregation process.

Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class UpdateHandler(ABC):
    """
    Abstract base class for managing update storage and retrieval in a federated learning setting.

    This class defines the required methods for handling updates from multiple sources,
    ensuring they are properly stored, retrieved, and processed during the aggregation process.
    """

    @abstractmethod
    async def init(self, config: dict):
        raise NotImplementedError

    @abstractmethod
    async def round_expected_updates(self, federation_nodes: set):
        """
        Initializes the expected updates for the current round.

        This method sets up the expected sources (`federation_nodes`) that should provide updates
        in the current training round. It ensures that each source has an entry in the storage
        and resets any previous tracking of received updates.

        Args:
            federation_nodes (set): A set of node identifiers expected to provide updates.
        """
        raise NotImplementedError

    @abstractmethod
    async def storage_update(self, updt_received_event: UpdateReceivedEvent):
        """
        Stores an update from a source in the update storage.

        This method ensures that an update received from a source is properly stored in the buffer,
        avoiding duplicates and managing update history if necessary.

        Args:
            model: The model associated with the update.
            weight: The weight assigned to the update (e.g., based on the amount of data used in training).
            source (str): The identifier of the node sending the update.
            round (int): The current device local training round when the update was done.
            local (boolean): Local update
        """
        raise NotImplementedError

    @abstractmethod
    async def get_round_updates(self) -> dict[str, tuple[object, float]]:
        """
        Retrieves the latest updates from all received sources in the current round.

        This method collects updates from all sources that have sent updates,
        prioritizing the most recent update available in the buffer.

        Returns:
            dict: A dictionary where keys are source identifiers and values are tuples `(model, weight)`,
                  representing the latest updates received from each source.
        """
        raise NotImplementedError

    @abstractmethod
    async def notify_federation_update(self, updt_nei_event: UpdateNeighborEvent):
        """
        Notifies the system of a change in the federation regarding a specific source.

        If a source leaves the federation, it is removed from the list of expected updates.
        If a source is newly added, it is registered for future updates.

        Args:
            source (str): The identifier of the source node.
            remove (bool, optional): Whether to remove the source from the federation. Defaults to `False`.
        """
        raise NotImplementedError

    @abstractmethod
    async def get_round_missing_nodes(self) -> set[str]:
        """
        Identifies sources that have not yet provided updates in the current round.

        Returns:
            set: A set of source identifiers that are expected to send updates but have not yet been received.
        """
        raise NotImplementedError

    @abstractmethod
    async def notify_if_all_updates_received(self):
        """
        Notifies the system when all expected updates for the current round have been received.
        """
        raise NotImplementedError

    @abstractmethod
    async def stop_notifying_updates(self):
        """
        Stops notifications related to update reception.

        This method can be used to reset any notification mechanisms or stop tracking updates
        if the aggregation process is halted.
        """
        raise NotImplementedError

get_round_missing_nodes() abstractmethod async

Identifies sources that have not yet provided updates in the current round.

Returns:

Name Type Description
set set[str]

A set of source identifiers that are expected to send updates but have not yet been received.

Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
81
82
83
84
85
86
87
88
89
@abstractmethod
async def get_round_missing_nodes(self) -> set[str]:
    """
    Identifies sources that have not yet provided updates in the current round.

    Returns:
        set: A set of source identifiers that are expected to send updates but have not yet been received.
    """
    raise NotImplementedError

get_round_updates() abstractmethod async

Retrieves the latest updates from all received sources in the current round.

This method collects updates from all sources that have sent updates, prioritizing the most recent update available in the buffer.

Returns:

Name Type Description
dict dict[str, tuple[object, float]]

A dictionary where keys are source identifiers and values are tuples (model, weight), representing the latest updates received from each source.

Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
53
54
55
56
57
58
59
60
61
62
63
64
65
@abstractmethod
async def get_round_updates(self) -> dict[str, tuple[object, float]]:
    """
    Retrieves the latest updates from all received sources in the current round.

    This method collects updates from all sources that have sent updates,
    prioritizing the most recent update available in the buffer.

    Returns:
        dict: A dictionary where keys are source identifiers and values are tuples `(model, weight)`,
              representing the latest updates received from each source.
    """
    raise NotImplementedError

notify_federation_update(updt_nei_event) abstractmethod async

Notifies the system of a change in the federation regarding a specific source.

If a source leaves the federation, it is removed from the list of expected updates. If a source is newly added, it is registered for future updates.

Parameters:

Name Type Description Default
source str

The identifier of the source node.

required
remove bool

Whether to remove the source from the federation. Defaults to False.

required
Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
67
68
69
70
71
72
73
74
75
76
77
78
79
@abstractmethod
async def notify_federation_update(self, updt_nei_event: UpdateNeighborEvent):
    """
    Notifies the system of a change in the federation regarding a specific source.

    If a source leaves the federation, it is removed from the list of expected updates.
    If a source is newly added, it is registered for future updates.

    Args:
        source (str): The identifier of the source node.
        remove (bool, optional): Whether to remove the source from the federation. Defaults to `False`.
    """
    raise NotImplementedError

notify_if_all_updates_received() abstractmethod async

Notifies the system when all expected updates for the current round have been received.

Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
91
92
93
94
95
96
@abstractmethod
async def notify_if_all_updates_received(self):
    """
    Notifies the system when all expected updates for the current round have been received.
    """
    raise NotImplementedError

round_expected_updates(federation_nodes) abstractmethod async

Initializes the expected updates for the current round.

This method sets up the expected sources (federation_nodes) that should provide updates in the current training round. It ensures that each source has an entry in the storage and resets any previous tracking of received updates.

Parameters:

Name Type Description Default
federation_nodes set

A set of node identifiers expected to provide updates.

required
Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
22
23
24
25
26
27
28
29
30
31
32
33
34
@abstractmethod
async def round_expected_updates(self, federation_nodes: set):
    """
    Initializes the expected updates for the current round.

    This method sets up the expected sources (`federation_nodes`) that should provide updates
    in the current training round. It ensures that each source has an entry in the storage
    and resets any previous tracking of received updates.

    Args:
        federation_nodes (set): A set of node identifiers expected to provide updates.
    """
    raise NotImplementedError

stop_notifying_updates() abstractmethod async

Stops notifications related to update reception.

This method can be used to reset any notification mechanisms or stop tracking updates if the aggregation process is halted.

Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
 98
 99
100
101
102
103
104
105
106
@abstractmethod
async def stop_notifying_updates(self):
    """
    Stops notifications related to update reception.

    This method can be used to reset any notification mechanisms or stop tracking updates
    if the aggregation process is halted.
    """
    raise NotImplementedError

storage_update(updt_received_event) abstractmethod async

Stores an update from a source in the update storage.

This method ensures that an update received from a source is properly stored in the buffer, avoiding duplicates and managing update history if necessary.

Parameters:

Name Type Description Default
model

The model associated with the update.

required
weight

The weight assigned to the update (e.g., based on the amount of data used in training).

required
source str

The identifier of the node sending the update.

required
round int

The current device local training round when the update was done.

required
local boolean

Local update

required
Source code in nebula/core/aggregation/updatehandlers/updatehandler.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@abstractmethod
async def storage_update(self, updt_received_event: UpdateReceivedEvent):
    """
    Stores an update from a source in the update storage.

    This method ensures that an update received from a source is properly stored in the buffer,
    avoiding duplicates and managing update history if necessary.

    Args:
        model: The model associated with the update.
        weight: The weight assigned to the update (e.g., based on the amount of data used in training).
        source (str): The identifier of the node sending the update.
        round (int): The current device local training round when the update was done.
        local (boolean): Local update
    """
    raise NotImplementedError