Skip to content

Documentation for Nebulaevents Module

AggregationEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class AggregationEvent(NodeEvent):
    def __init__(self, updates: dict, expected_nodes: set, missing_nodes: set):
        """Event triggered when model aggregation is ready.

        Args:
            updates (dict): Dictionary containing model updates.
            expected_nodes (set): Set of nodes expected to participate in aggregation.
            missing_nodes (set): Set of nodes that did not send their update.
        """
        self._updates = updates
        self._expected_nodes = expected_nodes
        self._missing_nodes = missing_nodes

    def __str__(self):
        return "Aggregation Ready"

    async def get_event_data(self) -> tuple[dict, set, set]:
        """Retrieves the aggregation event data.

        Returns:
            tuple[dict, set, set]:
                - updates (dict): Model updates.
                - expected_nodes (set): Expected nodes.
                - missing_nodes (set): Missing nodes.
        """
        return (self._updates, self._expected_nodes, self._missing_nodes)

    async def is_concurrent(self) -> bool:
        return False

__init__(updates, expected_nodes, missing_nodes)

Event triggered when model aggregation is ready.

Parameters:

Name Type Description Default
updates dict

Dictionary containing model updates.

required
expected_nodes set

Set of nodes expected to participate in aggregation.

required
missing_nodes set

Set of nodes that did not send their update.

required
Source code in nebula/core/nebulaevents.py
58
59
60
61
62
63
64
65
66
67
68
def __init__(self, updates: dict, expected_nodes: set, missing_nodes: set):
    """Event triggered when model aggregation is ready.

    Args:
        updates (dict): Dictionary containing model updates.
        expected_nodes (set): Set of nodes expected to participate in aggregation.
        missing_nodes (set): Set of nodes that did not send their update.
    """
    self._updates = updates
    self._expected_nodes = expected_nodes
    self._missing_nodes = missing_nodes

get_event_data() async

Retrieves the aggregation event data.

Returns:

Type Description
tuple[dict, set, set]

tuple[dict, set, set]: - updates (dict): Model updates. - expected_nodes (set): Expected nodes. - missing_nodes (set): Missing nodes.

Source code in nebula/core/nebulaevents.py
73
74
75
76
77
78
79
80
81
82
async def get_event_data(self) -> tuple[dict, set, set]:
    """Retrieves the aggregation event data.

    Returns:
        tuple[dict, set, set]:
            - updates (dict): Model updates.
            - expected_nodes (set): Expected nodes.
            - missing_nodes (set): Missing nodes.
    """
    return (self._updates, self._expected_nodes, self._missing_nodes)

RoundStartEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class RoundStartEvent(NodeEvent):
    def __init__(self, round, start_time, expected_nodes):
        """Event triggered when round is going to start.

        Args:
            round (int): Round number.
            start_time (time): Current time when round is going to start.
            rejected_nodes (set): Set of nodes that were rejected in the previous round.
        """
        self._round_start_time = start_time
        self._round = round
        self._expected_nodes = expected_nodes

    def __str__(self):
        return "Round starting"

    async def get_event_data(self):
        return (self._round, self._round_start_time, self._expected_nodes)

    async def is_concurrent(self):
        return False

__init__(round, start_time, expected_nodes)

Event triggered when round is going to start.

Parameters:

Name Type Description Default
round int

Round number.

required
start_time time

Current time when round is going to start.

required
rejected_nodes set

Set of nodes that were rejected in the previous round.

required
Source code in nebula/core/nebulaevents.py
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, round, start_time, expected_nodes):
    """Event triggered when round is going to start.

    Args:
        round (int): Round number.
        start_time (time): Current time when round is going to start.
        rejected_nodes (set): Set of nodes that were rejected in the previous round.
    """
    self._round_start_time = start_time
    self._round = round
    self._expected_nodes = expected_nodes

UpdateNeighborEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class UpdateNeighborEvent(NodeEvent):
    def __init__(self, node_addr, removed=False):
        """Event triggered when a neighboring node is updated.

        Args:
            node_addr (str): Address of the neighboring node.
            removed (bool, optional): Indicates whether the node was removed.
                                      Defaults to False.
        """
        self._node_addr = node_addr
        self._removed = removed

    def __str__(self):
        return f"Node addr: {self._node_addr}, removed: {self._removed}"

    async def get_event_data(self) -> tuple[str, bool]:
        """Retrieves the neighbor update event data.

        Returns:
            tuple[str, bool]:
                - node_addr (str): Address of the neighboring node.
                - removed (bool): Whether the node was removed.
        """
        return (self._node_addr, self._removed)

    async def is_concurrent(self) -> bool:
        return False

__init__(node_addr, removed=False)

Event triggered when a neighboring node is updated.

Parameters:

Name Type Description Default
node_addr str

Address of the neighboring node.

required
removed bool

Indicates whether the node was removed. Defaults to False.

False
Source code in nebula/core/nebulaevents.py
89
90
91
92
93
94
95
96
97
98
def __init__(self, node_addr, removed=False):
    """Event triggered when a neighboring node is updated.

    Args:
        node_addr (str): Address of the neighboring node.
        removed (bool, optional): Indicates whether the node was removed.
                                  Defaults to False.
    """
    self._node_addr = node_addr
    self._removed = removed

get_event_data() async

Retrieves the neighbor update event data.

Returns:

Type Description
tuple[str, bool]

tuple[str, bool]: - node_addr (str): Address of the neighboring node. - removed (bool): Whether the node was removed.

Source code in nebula/core/nebulaevents.py
103
104
105
106
107
108
109
110
111
async def get_event_data(self) -> tuple[str, bool]:
    """Retrieves the neighbor update event data.

    Returns:
        tuple[str, bool]:
            - node_addr (str): Address of the neighboring node.
            - removed (bool): Whether the node was removed.
    """
    return (self._node_addr, self._removed)

UpdateReceivedEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class UpdateReceivedEvent(NodeEvent):
    def __init__(self, decoded_model, weight, source, round, local=False):
        """
        Initializes an UpdateReceivedEvent.

        Args:
            decoded_model (Any): The received model update.
            weight (float): The weight associated with the received update.
            source (str): The identifier or address of the node that sent the update.
            round (int): The round number in which the update was received.
            local (bool): Local update
        """
        self._source = source
        self._round = round
        self._model = decoded_model
        self._weight = weight
        self._local = local

    def __str__(self):
        return f"Update received from source: {self._source}, round: {self._round}"

    async def get_event_data(self) -> tuple[str, bool]:
        """
        Retrieves the event data.

        Returns:
            tuple[Any, float, str, int, bool]: A tuple containing:
                - The received model update.
                - The weight associated with the update.
                - The source node identifier.
                - The round number of the update.
                - If the update is local
        """
        return (self._model, self._weight, self._source, self._round, self._local)

    async def is_concurrent(self) -> bool:
        return False

__init__(decoded_model, weight, source, round, local=False)

Initializes an UpdateReceivedEvent.

Parameters:

Name Type Description Default
decoded_model Any

The received model update.

required
weight float

The weight associated with the received update.

required
source str

The identifier or address of the node that sent the update.

required
round int

The round number in which the update was received.

required
local bool

Local update

False
Source code in nebula/core/nebulaevents.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def __init__(self, decoded_model, weight, source, round, local=False):
    """
    Initializes an UpdateReceivedEvent.

    Args:
        decoded_model (Any): The received model update.
        weight (float): The weight associated with the received update.
        source (str): The identifier or address of the node that sent the update.
        round (int): The round number in which the update was received.
        local (bool): Local update
    """
    self._source = source
    self._round = round
    self._model = decoded_model
    self._weight = weight
    self._local = local

get_event_data() async

Retrieves the event data.

Returns:

Type Description
tuple[str, bool]

tuple[Any, float, str, int, bool]: A tuple containing: - The received model update. - The weight associated with the update. - The source node identifier. - The round number of the update. - If the update is local

Source code in nebula/core/nebulaevents.py
138
139
140
141
142
143
144
145
146
147
148
149
150
async def get_event_data(self) -> tuple[str, bool]:
    """
    Retrieves the event data.

    Returns:
        tuple[Any, float, str, int, bool]: A tuple containing:
            - The received model update.
            - The weight associated with the update.
            - The source node identifier.
            - The round number of the update.
            - If the update is local
    """
    return (self._model, self._weight, self._source, self._round, self._local)