Skip to content

Documentation for Eventmanager Module

EventManager

Source code in nebula/core/eventmanager.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class EventManager:
    _instance = None
    _lock = Locker("event_manager")  # To avoid race conditions in multithreaded environments

    def __new__(cls, *args, **kwargs):
        """Implementation of the Singleton pattern."""
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialize(*args, **kwargs)
        return cls._instance

    def _initialize(self, verbose=False):
        """Initializes the single instance (runs only once)."""
        if hasattr(self, "_initialized"):  # Prevents resetting
            return
        self._subscribers: dict[tuple[str, str], list] = {}
        self._message_events_lock = Locker("message_events_lock", async_lock=True)
        self._addons_events_subs: dict[type, list] = {}
        self._addons_event_lock = Locker("addons_event_lock", async_lock=True)
        self._node_events_subs: dict[type, list] = {}
        self._node_events_lock = Locker("node_events_lock", async_lock=True)
        self._verbose = verbose
        self._initialized = True  # Mark already initialized

    @staticmethod
    def get_instance(verbose=False):
        """Static method to get the unique instance."""
        if EventManager._instance is None:
            EventManager(verbose=verbose)
        return EventManager._instance

    async def subscribe(self, event_type: tuple[str, str], callback: callable):
        """Register a callback for a specific event type."""
        async with self._message_events_lock:
            if event_type not in self._subscribers:
                self._subscribers[event_type] = []
            self._subscribers[event_type].append(callback)
        logging.info(f"EventManager | Subscribed callback for event: {event_type}")

    async def publish(self, message_event: MessageEvent):
        """Trigger all callbacks registered for a specific event type."""
        if self._verbose:
            logging.info(f"Publishing MessageEvent: {message_event.message_type}")
        async with self._message_events_lock:
            event_type = message_event.message_type
            callbacks = self._subscribers.get(event_type, [])
        if not callbacks:
            logging.error(f"EventManager | No subscribers for event: {event_type}")
            return

        for callback in self._subscribers[event_type]:
            try:
                if self._verbose:
                    logging.info(
                        f"EventManager | Triggering callback for event: {event_type}, from source: {message_event.source}"
                    )
                if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                    await callback(message_event.source, message_event.message)
                else:
                    callback(message_event.source, message_event.message)
            except Exception as e:
                logging.exception(f"EventManager | Error in callback for event {event_type}: {e}")

    async def subscribe_addonevent(self, addonEventType: type[AddonEvent], callback: callable):
        """Register a callback for a specific type of AddonEvent."""
        async with self._addons_event_lock:
            if addonEventType not in self._addons_events_subs:
                self._addons_events_subs[addonEventType] = []
            self._addons_events_subs[addonEventType].append(callback)
        logging.info(f"EventManager | Subscribed callback for AddonEvent type: {addonEventType.__name__}")

    async def publish_addonevent(self, addonevent: AddonEvent):
        """Trigger all callbacks registered for a specific type of AddonEvent."""
        if self._verbose:
            logging.info(f"Publishing AddonEvent: {addonevent}")
        async with self._addons_event_lock:
            event_type = type(addonevent)
            callbacks = self._addons_events_subs.get(event_type, [])

        if not callbacks:
            logging.error(f"EventManager | No subscribers for AddonEvent type: {event_type.__name__}")
            return

        for callback in self._addons_events_subs[event_type]:
            try:
                if self._verbose:
                    logging.info(f"EventManager | Triggering callback for event type: {event_type.__name__}")
                if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                    await callback(addonevent)
                else:
                    callback(addonevent)
            except Exception as e:
                logging.exception(f"EventManager | Error in callback for AddonEvent {event_type.__name__}: {e}")

    async def subscribe_node_event(self, nodeEventType: type[NodeEvent], callback: callable):
        """Register a callback for a specific type of AddonEvent."""
        async with self._node_events_lock:
            if nodeEventType not in self._node_events_subs:
                self._node_events_subs[nodeEventType] = []
            self._node_events_subs[nodeEventType].append(callback)
        logging.info(f"EventManager | Subscribed callback for NodeEvent type: {nodeEventType.__name__}")

    async def publish_node_event(self, nodeevent: NodeEvent):
        """Trigger all callbacks registered for a specific type of AddonEvent."""
        if self._verbose:
            logging.info(f"Publishing NodeEvent: {nodeevent}")
        async with self._node_events_lock:
            event_type = type(nodeevent)
            callbacks = self._node_events_subs.get(event_type, [])  # Extraer la lista de callbacks

        if not callbacks:
            if self._verbose:
                logging.error(f"EventManager | No subscribers for NodeEvent type: {event_type.__name__}")
            return

        for callback in self._node_events_subs[event_type]:
            try:
                if self._verbose:
                    logging.info(f"EventManager | Triggering callback for event type: {event_type.__name__}")
                if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                    if await nodeevent.is_concurrent():
                        asyncio.create_task(callback(nodeevent))
                    else:
                        await callback(nodeevent)
                else:
                    callback(nodeevent)
            except Exception as e:
                logging.exception(f"EventManager | Error in callback for NodeEvent {event_type.__name__}: {e}")

    async def unsubscribe_event(self, event_type, callback):
        """Unsubscribe a callback from a given event type (MessageEvent, AddonEvent, or NodeEvent)."""
        if isinstance(event_type, tuple):  # MessageEvent
            async with self._message_events_lock:
                if event_type in self._subscribers and callback in self._subscribers[event_type]:
                    self._subscribers[event_type].remove(callback)
                    logging.info(f"EventManager | Unsubscribed callback for MessageEvent: {event_type}")
        elif issubclass(event_type, AddonEvent):  # AddonEvent
            async with self._addons_event_lock:
                if event_type in self._addons_events_subs and callback in self._addons_events_subs[event_type]:
                    self._addons_events_subs[event_type].remove(callback)
                    logging.info(f"EventManager | Unsubscribed callback for AddonEvent: {event_type.__name__}")
        elif issubclass(event_type, NodeEvent):  # NodeEvent
            async with self._node_events_lock:
                if event_type in self._node_events_subs and callback in self._node_events_subs[event_type]:
                    self._node_events_subs[event_type].remove(callback)
                    logging.info(f"EventManager | Unsubscribed callback for NodeEvent: {event_type.__name__}")

__new__(*args, **kwargs)

Implementation of the Singleton pattern.

Source code in nebula/core/eventmanager.py
14
15
16
17
18
19
20
def __new__(cls, *args, **kwargs):
    """Implementation of the Singleton pattern."""
    with cls._lock:
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialize(*args, **kwargs)
    return cls._instance

get_instance(verbose=False) staticmethod

Static method to get the unique instance.

Source code in nebula/core/eventmanager.py
35
36
37
38
39
40
@staticmethod
def get_instance(verbose=False):
    """Static method to get the unique instance."""
    if EventManager._instance is None:
        EventManager(verbose=verbose)
    return EventManager._instance

publish(message_event) async

Trigger all callbacks registered for a specific event type.

Source code in nebula/core/eventmanager.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def publish(self, message_event: MessageEvent):
    """Trigger all callbacks registered for a specific event type."""
    if self._verbose:
        logging.info(f"Publishing MessageEvent: {message_event.message_type}")
    async with self._message_events_lock:
        event_type = message_event.message_type
        callbacks = self._subscribers.get(event_type, [])
    if not callbacks:
        logging.error(f"EventManager | No subscribers for event: {event_type}")
        return

    for callback in self._subscribers[event_type]:
        try:
            if self._verbose:
                logging.info(
                    f"EventManager | Triggering callback for event: {event_type}, from source: {message_event.source}"
                )
            if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                await callback(message_event.source, message_event.message)
            else:
                callback(message_event.source, message_event.message)
        except Exception as e:
            logging.exception(f"EventManager | Error in callback for event {event_type}: {e}")

publish_addonevent(addonevent) async

Trigger all callbacks registered for a specific type of AddonEvent.

Source code in nebula/core/eventmanager.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def publish_addonevent(self, addonevent: AddonEvent):
    """Trigger all callbacks registered for a specific type of AddonEvent."""
    if self._verbose:
        logging.info(f"Publishing AddonEvent: {addonevent}")
    async with self._addons_event_lock:
        event_type = type(addonevent)
        callbacks = self._addons_events_subs.get(event_type, [])

    if not callbacks:
        logging.error(f"EventManager | No subscribers for AddonEvent type: {event_type.__name__}")
        return

    for callback in self._addons_events_subs[event_type]:
        try:
            if self._verbose:
                logging.info(f"EventManager | Triggering callback for event type: {event_type.__name__}")
            if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                await callback(addonevent)
            else:
                callback(addonevent)
        except Exception as e:
            logging.exception(f"EventManager | Error in callback for AddonEvent {event_type.__name__}: {e}")

publish_node_event(nodeevent) async

Trigger all callbacks registered for a specific type of AddonEvent.

Source code in nebula/core/eventmanager.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def publish_node_event(self, nodeevent: NodeEvent):
    """Trigger all callbacks registered for a specific type of AddonEvent."""
    if self._verbose:
        logging.info(f"Publishing NodeEvent: {nodeevent}")
    async with self._node_events_lock:
        event_type = type(nodeevent)
        callbacks = self._node_events_subs.get(event_type, [])  # Extraer la lista de callbacks

    if not callbacks:
        if self._verbose:
            logging.error(f"EventManager | No subscribers for NodeEvent type: {event_type.__name__}")
        return

    for callback in self._node_events_subs[event_type]:
        try:
            if self._verbose:
                logging.info(f"EventManager | Triggering callback for event type: {event_type.__name__}")
            if asyncio.iscoroutinefunction(callback) or inspect.iscoroutine(callback):
                if await nodeevent.is_concurrent():
                    asyncio.create_task(callback(nodeevent))
                else:
                    await callback(nodeevent)
            else:
                callback(nodeevent)
        except Exception as e:
            logging.exception(f"EventManager | Error in callback for NodeEvent {event_type.__name__}: {e}")

subscribe(event_type, callback) async

Register a callback for a specific event type.

Source code in nebula/core/eventmanager.py
42
43
44
45
46
47
48
async def subscribe(self, event_type: tuple[str, str], callback: callable):
    """Register a callback for a specific event type."""
    async with self._message_events_lock:
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(callback)
    logging.info(f"EventManager | Subscribed callback for event: {event_type}")

subscribe_addonevent(addonEventType, callback) async

Register a callback for a specific type of AddonEvent.

Source code in nebula/core/eventmanager.py
74
75
76
77
78
79
80
async def subscribe_addonevent(self, addonEventType: type[AddonEvent], callback: callable):
    """Register a callback for a specific type of AddonEvent."""
    async with self._addons_event_lock:
        if addonEventType not in self._addons_events_subs:
            self._addons_events_subs[addonEventType] = []
        self._addons_events_subs[addonEventType].append(callback)
    logging.info(f"EventManager | Subscribed callback for AddonEvent type: {addonEventType.__name__}")

subscribe_node_event(nodeEventType, callback) async

Register a callback for a specific type of AddonEvent.

Source code in nebula/core/eventmanager.py
105
106
107
108
109
110
111
async def subscribe_node_event(self, nodeEventType: type[NodeEvent], callback: callable):
    """Register a callback for a specific type of AddonEvent."""
    async with self._node_events_lock:
        if nodeEventType not in self._node_events_subs:
            self._node_events_subs[nodeEventType] = []
        self._node_events_subs[nodeEventType].append(callback)
    logging.info(f"EventManager | Subscribed callback for NodeEvent type: {nodeEventType.__name__}")

unsubscribe_event(event_type, callback) async

Unsubscribe a callback from a given event type (MessageEvent, AddonEvent, or NodeEvent).

Source code in nebula/core/eventmanager.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
async def unsubscribe_event(self, event_type, callback):
    """Unsubscribe a callback from a given event type (MessageEvent, AddonEvent, or NodeEvent)."""
    if isinstance(event_type, tuple):  # MessageEvent
        async with self._message_events_lock:
            if event_type in self._subscribers and callback in self._subscribers[event_type]:
                self._subscribers[event_type].remove(callback)
                logging.info(f"EventManager | Unsubscribed callback for MessageEvent: {event_type}")
    elif issubclass(event_type, AddonEvent):  # AddonEvent
        async with self._addons_event_lock:
            if event_type in self._addons_events_subs and callback in self._addons_events_subs[event_type]:
                self._addons_events_subs[event_type].remove(callback)
                logging.info(f"EventManager | Unsubscribed callback for AddonEvent: {event_type.__name__}")
    elif issubclass(event_type, NodeEvent):  # NodeEvent
        async with self._node_events_lock:
            if event_type in self._node_events_subs and callback in self._node_events_subs[event_type]:
                self._node_events_subs[event_type].remove(callback)
                logging.info(f"EventManager | Unsubscribed callback for NodeEvent: {event_type.__name__}")