ringcentral.websocket.web_socket_client
1#!/usr/bin/env python 2# encoding: utf-8 3from observable import Observable 4import websockets 5from .web_socket_subscription import WebSocketSubscription 6from .events import WebSocketEvents 7import json 8import asyncio 9import uuid 10 11class WebSocketClient(Observable): 12 def __init__(self, platform): 13 Observable.__init__(self) 14 self._platform = platform 15 self._web_socket = None 16 self._done = False 17 self._is_ready = False 18 self._send_attempt_counter = 0 19 20 async def create_new_connection(self): 21 """ 22 Creates a new WebSocket connection. 23 24 Returns: 25 Any: Response object containing the result of the connection creation. 26 27 Raises: 28 Exception: If any error occurs during the process. 29 30 Note: 31 - Retrieves the WebSocket token using `get_web_socket_token`. 32 - Attempts to open a WebSocket connection using the retrieved token's URI and access token. 33 - Triggers the createConnectionError event if an error occurs and raises the exception. 34 """ 35 try: 36 web_socket_token = self.get_web_socket_token() 37 open_connection_response = await self.open_connection( 38 web_socket_token["uri"], web_socket_token["ws_access_token"] 39 ) 40 return open_connection_response 41 except Exception as e: 42 self.trigger(WebSocketEvents.createConnectionError, e) 43 raise 44 45 def get_web_socket_token(self): 46 """ 47 Retrieves a WebSocket token. 48 49 Returns: 50 dict: WebSocket token containing URI and access token. 51 52 Raises: 53 Exception: If any error occurs during the process. 54 55 Note: 56 - Sends a POST request to the '/restapi/oauth/wstoken' endpoint to obtain the WebSocket token. 57 - Returns the WebSocket token as a dictionary containing the URI and access token. 58 - Triggers the getTokenError event if an error occurs and raises the exception. 59 """ 60 try: 61 response = self._platform.post("/restapi/oauth/wstoken", body={}) 62 return response.json_dict() 63 except Exception as e: 64 self.trigger(WebSocketEvents.getTokenError, e) 65 raise 66 67 async def open_connection(self, ws_uri, ws_access_token): 68 """ 69 Opens a WebSocket connection. 70 71 Args: 72 ws_uri (str): The WebSocket URI. 73 ws_access_token (str): The access token for WebSocket authentication. 74 75 Raises: 76 Exception: If any error occurs during the process. 77 78 Note: 79 - Attempts to establish a WebSocket connection to the provided URI with the given access token. 80 - Upon successful connection, sets up a heartbeat mechanism to maintain the connection. 81 - Triggers the connectionCreated event upon successful connection establishment. 82 - Listens for incoming messages and triggers the receiveMessage event for each received message. 83 - Triggers the createConnectionError event if an error occurs during the connection process and raises the exception. 84 """ 85 try: 86 websocket = await websockets.connect( 87 f"{ws_uri}?access_token={ws_access_token}" 88 ) 89 connectionMessage = await websocket.recv() 90 connection_info = {} 91 connection_info["connection"] = websocket 92 connection_info["connection_details"] = connectionMessage 93 self._web_socket = connection_info 94 self._is_ready = True 95 self.trigger(WebSocketEvents.connectionCreated, self) 96 97 # heartbeat every 10 minutes 98 async def timer_function(): 99 while True: 100 if self._done: 101 timer.cancel() 102 break 103 await asyncio.sleep(600) 104 await self.send_message([{"type": "Heartbeat", "messageId": str(uuid.uuid4())}]) 105 timer = asyncio.create_task(timer_function()) 106 107 await asyncio.sleep(0) 108 while True: 109 message = await websocket.recv() 110 self.trigger(WebSocketEvents.receiveMessage, message) 111 await asyncio.sleep(0) 112 except Exception as e: 113 self.trigger(WebSocketEvents.createConnectionError, e) 114 raise 115 116 def get_connection_info(self): 117 return self._web_socket 118 119 def get_connection(self): 120 return self._web_socket["connection"] 121 122 async def close_connection(self): 123 """ 124 Closes the WebSocket connection. 125 126 Raises: 127 Exception: If any error occurs during the process. 128 129 Note: 130 - Sets the `_done` flag to True to signal the termination of the heartbeat mechanism. 131 - Sets the `_is_ready` flag to False to indicate that the connection is no longer ready. 132 - Retrieves the WebSocket connection using `get_connection`. 133 - Closes the WebSocket connection. 134 - Triggers the closeConnectionError event if an error occurs during the closing process and raises the exception. 135 """ 136 try: 137 self._done = True 138 self._is_ready = False 139 ws_connection = self.get_connection() 140 await ws_connection.close() 141 except Exception as e: 142 self.trigger(WebSocketEvents.closeConnectionError, e) 143 raise 144 145 async def recover_connection(self): 146 try: 147 ws_connection_info = self.get_web_socket_token() 148 recovered_connection_info = await self.open_connection( 149 ws_connection_info["uri"], ws_connection_info["ws_access_token"] 150 ) 151 # if recovered_connection_info['recoveryState'] === "Successful", then subscription is restored 152 # otherwise, need to create a new subscription 153 # IMPORTANT: WebSocket creation is successful if it doesn't raise any exception 154 return recovered_connection_info 155 except Exception as e: 156 self.trigger(WebSocketEvents.recoverConnectionError, e) 157 raise 158 159 async def send_message(self, message): 160 """ 161 Sends a message over the WebSocket connection. 162 163 Args: 164 message (Any): The message to be sent. 165 166 Raises: 167 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 168 169 Note: 170 - Checks if the WebSocket connection is ready (`_is_ready` flag). 171 - If the connection is ready, resets the send attempt counter and sends the message. 172 - If the connection is not ready, retries after a delay and increments the send attempt counter. 173 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 174 """ 175 try: 176 if self._is_ready: 177 self._send_attempt_counter = 0 178 requestBodyJson = json.dumps(message) 179 await self.get_connection().send(requestBodyJson) 180 else: 181 await asyncio.sleep(1) 182 await self.send_message(message) 183 self._send_attempt_counter += 1 184 if(self._send_attempt_counter > 10): 185 self.trigger(WebSocketEvents.connectionNotReady) 186 self._send_attempt_counter = 0 187 raise 188 except Exception as e: 189 self.trigger(WebSocketEvents.sendMessageError, e) 190 raise 191 192 async def create_subscription(self, events=None): 193 """ 194 Creates a subscription to WebSocket events. 195 196 Args: 197 events (list, optional): A list of events to subscribe to. Default is None. 198 199 Raises: 200 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 201 202 Note: 203 - If the WebSocket connection is ready (`_is_ready` flag), resets the send attempt counter and creates a WebSocketSubscription instance. 204 - Registers the subscription with the specified events. 205 - If the connection is not ready, retries after a delay and increments the send attempt counter. 206 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 207 """ 208 try: 209 if self._is_ready: 210 self._send_attempt_counter = 0 211 subscription = WebSocketSubscription(self) 212 await subscription.register(events) 213 else: 214 await asyncio.sleep(1) 215 await self.create_subscription(events) 216 self._send_attempt_counter += 1 217 if(self._send_attempt_counter > 10): 218 self.trigger(WebSocketEvents.connectionNotReady) 219 self._send_attempt_counter = 0 220 raise 221 222 except Exception as e: 223 self.trigger(WebSocketEvents.createSubscriptionError, e) 224 raise 225 226 async def update_subscription(self, subscription, events=None): 227 """ 228 Updates an existing WebSocket subscription with new events. 229 230 Args: 231 subscription : The WebSocket subscription to update. 232 events (list, optional): A list of events to update the subscription with. Default is None. 233 234 Returns: 235 WebSocketSubscription: The updated WebSocket subscription. 236 237 Raises: 238 Exception: If any error occurs during the process. 239 240 Note: 241 - Updates the specified WebSocket subscription with the new events provided. 242 - If the update is successful, returns the updated WebSocket subscription. 243 - If an error occurs during the update process, triggers the updateSubscriptionError event and raises an exception. 244 """ 245 try: 246 await subscription.update(events) 247 return subscription 248 except Exception as e: 249 self.trigger(WebSocketEvents.updateSubscriptionError, e) 250 raise 251 252 async def remove_subscription(self, subscription): 253 """ 254 Removes an existing WebSocket subscription. 255 256 Args: 257 subscription : The WebSocket subscription to remove. 258 259 Raises: 260 Exception: If any error occurs during the removal process. 261 262 Note: 263 - Removes the specified WebSocket subscription. 264 - If the removal is successful, the subscription is effectively unsubscribed from the events it was subscribed to. 265 - If an error occurs during the removal process, triggers the removeSubscriptionError event and raises an exception. 266 """ 267 try: 268 await subscription.remove() 269 except Exception as e: 270 self.trigger(WebSocketEvents.removeSubscriptionError, e) 271 raise 272 273 274if __name__ == "__main__": 275 pass
12class WebSocketClient(Observable): 13 def __init__(self, platform): 14 Observable.__init__(self) 15 self._platform = platform 16 self._web_socket = None 17 self._done = False 18 self._is_ready = False 19 self._send_attempt_counter = 0 20 21 async def create_new_connection(self): 22 """ 23 Creates a new WebSocket connection. 24 25 Returns: 26 Any: Response object containing the result of the connection creation. 27 28 Raises: 29 Exception: If any error occurs during the process. 30 31 Note: 32 - Retrieves the WebSocket token using `get_web_socket_token`. 33 - Attempts to open a WebSocket connection using the retrieved token's URI and access token. 34 - Triggers the createConnectionError event if an error occurs and raises the exception. 35 """ 36 try: 37 web_socket_token = self.get_web_socket_token() 38 open_connection_response = await self.open_connection( 39 web_socket_token["uri"], web_socket_token["ws_access_token"] 40 ) 41 return open_connection_response 42 except Exception as e: 43 self.trigger(WebSocketEvents.createConnectionError, e) 44 raise 45 46 def get_web_socket_token(self): 47 """ 48 Retrieves a WebSocket token. 49 50 Returns: 51 dict: WebSocket token containing URI and access token. 52 53 Raises: 54 Exception: If any error occurs during the process. 55 56 Note: 57 - Sends a POST request to the '/restapi/oauth/wstoken' endpoint to obtain the WebSocket token. 58 - Returns the WebSocket token as a dictionary containing the URI and access token. 59 - Triggers the getTokenError event if an error occurs and raises the exception. 60 """ 61 try: 62 response = self._platform.post("/restapi/oauth/wstoken", body={}) 63 return response.json_dict() 64 except Exception as e: 65 self.trigger(WebSocketEvents.getTokenError, e) 66 raise 67 68 async def open_connection(self, ws_uri, ws_access_token): 69 """ 70 Opens a WebSocket connection. 71 72 Args: 73 ws_uri (str): The WebSocket URI. 74 ws_access_token (str): The access token for WebSocket authentication. 75 76 Raises: 77 Exception: If any error occurs during the process. 78 79 Note: 80 - Attempts to establish a WebSocket connection to the provided URI with the given access token. 81 - Upon successful connection, sets up a heartbeat mechanism to maintain the connection. 82 - Triggers the connectionCreated event upon successful connection establishment. 83 - Listens for incoming messages and triggers the receiveMessage event for each received message. 84 - Triggers the createConnectionError event if an error occurs during the connection process and raises the exception. 85 """ 86 try: 87 websocket = await websockets.connect( 88 f"{ws_uri}?access_token={ws_access_token}" 89 ) 90 connectionMessage = await websocket.recv() 91 connection_info = {} 92 connection_info["connection"] = websocket 93 connection_info["connection_details"] = connectionMessage 94 self._web_socket = connection_info 95 self._is_ready = True 96 self.trigger(WebSocketEvents.connectionCreated, self) 97 98 # heartbeat every 10 minutes 99 async def timer_function(): 100 while True: 101 if self._done: 102 timer.cancel() 103 break 104 await asyncio.sleep(600) 105 await self.send_message([{"type": "Heartbeat", "messageId": str(uuid.uuid4())}]) 106 timer = asyncio.create_task(timer_function()) 107 108 await asyncio.sleep(0) 109 while True: 110 message = await websocket.recv() 111 self.trigger(WebSocketEvents.receiveMessage, message) 112 await asyncio.sleep(0) 113 except Exception as e: 114 self.trigger(WebSocketEvents.createConnectionError, e) 115 raise 116 117 def get_connection_info(self): 118 return self._web_socket 119 120 def get_connection(self): 121 return self._web_socket["connection"] 122 123 async def close_connection(self): 124 """ 125 Closes the WebSocket connection. 126 127 Raises: 128 Exception: If any error occurs during the process. 129 130 Note: 131 - Sets the `_done` flag to True to signal the termination of the heartbeat mechanism. 132 - Sets the `_is_ready` flag to False to indicate that the connection is no longer ready. 133 - Retrieves the WebSocket connection using `get_connection`. 134 - Closes the WebSocket connection. 135 - Triggers the closeConnectionError event if an error occurs during the closing process and raises the exception. 136 """ 137 try: 138 self._done = True 139 self._is_ready = False 140 ws_connection = self.get_connection() 141 await ws_connection.close() 142 except Exception as e: 143 self.trigger(WebSocketEvents.closeConnectionError, e) 144 raise 145 146 async def recover_connection(self): 147 try: 148 ws_connection_info = self.get_web_socket_token() 149 recovered_connection_info = await self.open_connection( 150 ws_connection_info["uri"], ws_connection_info["ws_access_token"] 151 ) 152 # if recovered_connection_info['recoveryState'] === "Successful", then subscription is restored 153 # otherwise, need to create a new subscription 154 # IMPORTANT: WebSocket creation is successful if it doesn't raise any exception 155 return recovered_connection_info 156 except Exception as e: 157 self.trigger(WebSocketEvents.recoverConnectionError, e) 158 raise 159 160 async def send_message(self, message): 161 """ 162 Sends a message over the WebSocket connection. 163 164 Args: 165 message (Any): The message to be sent. 166 167 Raises: 168 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 169 170 Note: 171 - Checks if the WebSocket connection is ready (`_is_ready` flag). 172 - If the connection is ready, resets the send attempt counter and sends the message. 173 - If the connection is not ready, retries after a delay and increments the send attempt counter. 174 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 175 """ 176 try: 177 if self._is_ready: 178 self._send_attempt_counter = 0 179 requestBodyJson = json.dumps(message) 180 await self.get_connection().send(requestBodyJson) 181 else: 182 await asyncio.sleep(1) 183 await self.send_message(message) 184 self._send_attempt_counter += 1 185 if(self._send_attempt_counter > 10): 186 self.trigger(WebSocketEvents.connectionNotReady) 187 self._send_attempt_counter = 0 188 raise 189 except Exception as e: 190 self.trigger(WebSocketEvents.sendMessageError, e) 191 raise 192 193 async def create_subscription(self, events=None): 194 """ 195 Creates a subscription to WebSocket events. 196 197 Args: 198 events (list, optional): A list of events to subscribe to. Default is None. 199 200 Raises: 201 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 202 203 Note: 204 - If the WebSocket connection is ready (`_is_ready` flag), resets the send attempt counter and creates a WebSocketSubscription instance. 205 - Registers the subscription with the specified events. 206 - If the connection is not ready, retries after a delay and increments the send attempt counter. 207 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 208 """ 209 try: 210 if self._is_ready: 211 self._send_attempt_counter = 0 212 subscription = WebSocketSubscription(self) 213 await subscription.register(events) 214 else: 215 await asyncio.sleep(1) 216 await self.create_subscription(events) 217 self._send_attempt_counter += 1 218 if(self._send_attempt_counter > 10): 219 self.trigger(WebSocketEvents.connectionNotReady) 220 self._send_attempt_counter = 0 221 raise 222 223 except Exception as e: 224 self.trigger(WebSocketEvents.createSubscriptionError, e) 225 raise 226 227 async def update_subscription(self, subscription, events=None): 228 """ 229 Updates an existing WebSocket subscription with new events. 230 231 Args: 232 subscription : The WebSocket subscription to update. 233 events (list, optional): A list of events to update the subscription with. Default is None. 234 235 Returns: 236 WebSocketSubscription: The updated WebSocket subscription. 237 238 Raises: 239 Exception: If any error occurs during the process. 240 241 Note: 242 - Updates the specified WebSocket subscription with the new events provided. 243 - If the update is successful, returns the updated WebSocket subscription. 244 - If an error occurs during the update process, triggers the updateSubscriptionError event and raises an exception. 245 """ 246 try: 247 await subscription.update(events) 248 return subscription 249 except Exception as e: 250 self.trigger(WebSocketEvents.updateSubscriptionError, e) 251 raise 252 253 async def remove_subscription(self, subscription): 254 """ 255 Removes an existing WebSocket subscription. 256 257 Args: 258 subscription : The WebSocket subscription to remove. 259 260 Raises: 261 Exception: If any error occurs during the removal process. 262 263 Note: 264 - Removes the specified WebSocket subscription. 265 - If the removal is successful, the subscription is effectively unsubscribed from the events it was subscribed to. 266 - If an error occurs during the removal process, triggers the removeSubscriptionError event and raises an exception. 267 """ 268 try: 269 await subscription.remove() 270 except Exception as e: 271 self.trigger(WebSocketEvents.removeSubscriptionError, e) 272 raise
Event system for python
21 async def create_new_connection(self): 22 """ 23 Creates a new WebSocket connection. 24 25 Returns: 26 Any: Response object containing the result of the connection creation. 27 28 Raises: 29 Exception: If any error occurs during the process. 30 31 Note: 32 - Retrieves the WebSocket token using `get_web_socket_token`. 33 - Attempts to open a WebSocket connection using the retrieved token's URI and access token. 34 - Triggers the createConnectionError event if an error occurs and raises the exception. 35 """ 36 try: 37 web_socket_token = self.get_web_socket_token() 38 open_connection_response = await self.open_connection( 39 web_socket_token["uri"], web_socket_token["ws_access_token"] 40 ) 41 return open_connection_response 42 except Exception as e: 43 self.trigger(WebSocketEvents.createConnectionError, e) 44 raise
Creates a new WebSocket connection.
Returns: Any: Response object containing the result of the connection creation.
Raises: Exception: If any error occurs during the process.
Note:
- Retrieves the WebSocket token using get_web_socket_token
.
- Attempts to open a WebSocket connection using the retrieved token's URI and access token.
- Triggers the createConnectionError event if an error occurs and raises the exception.
46 def get_web_socket_token(self): 47 """ 48 Retrieves a WebSocket token. 49 50 Returns: 51 dict: WebSocket token containing URI and access token. 52 53 Raises: 54 Exception: If any error occurs during the process. 55 56 Note: 57 - Sends a POST request to the '/restapi/oauth/wstoken' endpoint to obtain the WebSocket token. 58 - Returns the WebSocket token as a dictionary containing the URI and access token. 59 - Triggers the getTokenError event if an error occurs and raises the exception. 60 """ 61 try: 62 response = self._platform.post("/restapi/oauth/wstoken", body={}) 63 return response.json_dict() 64 except Exception as e: 65 self.trigger(WebSocketEvents.getTokenError, e) 66 raise
Retrieves a WebSocket token.
Returns: dict: WebSocket token containing URI and access token.
Raises: Exception: If any error occurs during the process.
Note: - Sends a POST request to the '/restapi/oauth/wstoken' endpoint to obtain the WebSocket token. - Returns the WebSocket token as a dictionary containing the URI and access token. - Triggers the getTokenError event if an error occurs and raises the exception.
68 async def open_connection(self, ws_uri, ws_access_token): 69 """ 70 Opens a WebSocket connection. 71 72 Args: 73 ws_uri (str): The WebSocket URI. 74 ws_access_token (str): The access token for WebSocket authentication. 75 76 Raises: 77 Exception: If any error occurs during the process. 78 79 Note: 80 - Attempts to establish a WebSocket connection to the provided URI with the given access token. 81 - Upon successful connection, sets up a heartbeat mechanism to maintain the connection. 82 - Triggers the connectionCreated event upon successful connection establishment. 83 - Listens for incoming messages and triggers the receiveMessage event for each received message. 84 - Triggers the createConnectionError event if an error occurs during the connection process and raises the exception. 85 """ 86 try: 87 websocket = await websockets.connect( 88 f"{ws_uri}?access_token={ws_access_token}" 89 ) 90 connectionMessage = await websocket.recv() 91 connection_info = {} 92 connection_info["connection"] = websocket 93 connection_info["connection_details"] = connectionMessage 94 self._web_socket = connection_info 95 self._is_ready = True 96 self.trigger(WebSocketEvents.connectionCreated, self) 97 98 # heartbeat every 10 minutes 99 async def timer_function(): 100 while True: 101 if self._done: 102 timer.cancel() 103 break 104 await asyncio.sleep(600) 105 await self.send_message([{"type": "Heartbeat", "messageId": str(uuid.uuid4())}]) 106 timer = asyncio.create_task(timer_function()) 107 108 await asyncio.sleep(0) 109 while True: 110 message = await websocket.recv() 111 self.trigger(WebSocketEvents.receiveMessage, message) 112 await asyncio.sleep(0) 113 except Exception as e: 114 self.trigger(WebSocketEvents.createConnectionError, e) 115 raise
Opens a WebSocket connection.
Args: ws_uri (str): The WebSocket URI. ws_access_token (str): The access token for WebSocket authentication.
Raises: Exception: If any error occurs during the process.
Note: - Attempts to establish a WebSocket connection to the provided URI with the given access token. - Upon successful connection, sets up a heartbeat mechanism to maintain the connection. - Triggers the connectionCreated event upon successful connection establishment. - Listens for incoming messages and triggers the receiveMessage event for each received message. - Triggers the createConnectionError event if an error occurs during the connection process and raises the exception.
123 async def close_connection(self): 124 """ 125 Closes the WebSocket connection. 126 127 Raises: 128 Exception: If any error occurs during the process. 129 130 Note: 131 - Sets the `_done` flag to True to signal the termination of the heartbeat mechanism. 132 - Sets the `_is_ready` flag to False to indicate that the connection is no longer ready. 133 - Retrieves the WebSocket connection using `get_connection`. 134 - Closes the WebSocket connection. 135 - Triggers the closeConnectionError event if an error occurs during the closing process and raises the exception. 136 """ 137 try: 138 self._done = True 139 self._is_ready = False 140 ws_connection = self.get_connection() 141 await ws_connection.close() 142 except Exception as e: 143 self.trigger(WebSocketEvents.closeConnectionError, e) 144 raise
Closes the WebSocket connection.
Raises: Exception: If any error occurs during the process.
Note:
- Sets the _done
flag to True to signal the termination of the heartbeat mechanism.
- Sets the _is_ready
flag to False to indicate that the connection is no longer ready.
- Retrieves the WebSocket connection using get_connection
.
- Closes the WebSocket connection.
- Triggers the closeConnectionError event if an error occurs during the closing process and raises the exception.
146 async def recover_connection(self): 147 try: 148 ws_connection_info = self.get_web_socket_token() 149 recovered_connection_info = await self.open_connection( 150 ws_connection_info["uri"], ws_connection_info["ws_access_token"] 151 ) 152 # if recovered_connection_info['recoveryState'] === "Successful", then subscription is restored 153 # otherwise, need to create a new subscription 154 # IMPORTANT: WebSocket creation is successful if it doesn't raise any exception 155 return recovered_connection_info 156 except Exception as e: 157 self.trigger(WebSocketEvents.recoverConnectionError, e) 158 raise
160 async def send_message(self, message): 161 """ 162 Sends a message over the WebSocket connection. 163 164 Args: 165 message (Any): The message to be sent. 166 167 Raises: 168 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 169 170 Note: 171 - Checks if the WebSocket connection is ready (`_is_ready` flag). 172 - If the connection is ready, resets the send attempt counter and sends the message. 173 - If the connection is not ready, retries after a delay and increments the send attempt counter. 174 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 175 """ 176 try: 177 if self._is_ready: 178 self._send_attempt_counter = 0 179 requestBodyJson = json.dumps(message) 180 await self.get_connection().send(requestBodyJson) 181 else: 182 await asyncio.sleep(1) 183 await self.send_message(message) 184 self._send_attempt_counter += 1 185 if(self._send_attempt_counter > 10): 186 self.trigger(WebSocketEvents.connectionNotReady) 187 self._send_attempt_counter = 0 188 raise 189 except Exception as e: 190 self.trigger(WebSocketEvents.sendMessageError, e) 191 raise
Sends a message over the WebSocket connection.
Args: message (Any): The message to be sent.
Raises: Exception: If any error occurs during the process or if the connection is not ready after multiple attempts.
Note:
- Checks if the WebSocket connection is ready (_is_ready
flag).
- If the connection is ready, resets the send attempt counter and sends the message.
- If the connection is not ready, retries after a delay and increments the send attempt counter.
- If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception.
193 async def create_subscription(self, events=None): 194 """ 195 Creates a subscription to WebSocket events. 196 197 Args: 198 events (list, optional): A list of events to subscribe to. Default is None. 199 200 Raises: 201 Exception: If any error occurs during the process or if the connection is not ready after multiple attempts. 202 203 Note: 204 - If the WebSocket connection is ready (`_is_ready` flag), resets the send attempt counter and creates a WebSocketSubscription instance. 205 - Registers the subscription with the specified events. 206 - If the connection is not ready, retries after a delay and increments the send attempt counter. 207 - If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception. 208 """ 209 try: 210 if self._is_ready: 211 self._send_attempt_counter = 0 212 subscription = WebSocketSubscription(self) 213 await subscription.register(events) 214 else: 215 await asyncio.sleep(1) 216 await self.create_subscription(events) 217 self._send_attempt_counter += 1 218 if(self._send_attempt_counter > 10): 219 self.trigger(WebSocketEvents.connectionNotReady) 220 self._send_attempt_counter = 0 221 raise 222 223 except Exception as e: 224 self.trigger(WebSocketEvents.createSubscriptionError, e) 225 raise
Creates a subscription to WebSocket events.
Args: events (list, optional): A list of events to subscribe to. Default is None.
Raises: Exception: If any error occurs during the process or if the connection is not ready after multiple attempts.
Note:
- If the WebSocket connection is ready (_is_ready
flag), resets the send attempt counter and creates a WebSocketSubscription instance.
- Registers the subscription with the specified events.
- If the connection is not ready, retries after a delay and increments the send attempt counter.
- If the send attempt counter exceeds a threshold, triggers the connectionNotReady event and raises an exception.
227 async def update_subscription(self, subscription, events=None): 228 """ 229 Updates an existing WebSocket subscription with new events. 230 231 Args: 232 subscription : The WebSocket subscription to update. 233 events (list, optional): A list of events to update the subscription with. Default is None. 234 235 Returns: 236 WebSocketSubscription: The updated WebSocket subscription. 237 238 Raises: 239 Exception: If any error occurs during the process. 240 241 Note: 242 - Updates the specified WebSocket subscription with the new events provided. 243 - If the update is successful, returns the updated WebSocket subscription. 244 - If an error occurs during the update process, triggers the updateSubscriptionError event and raises an exception. 245 """ 246 try: 247 await subscription.update(events) 248 return subscription 249 except Exception as e: 250 self.trigger(WebSocketEvents.updateSubscriptionError, e) 251 raise
Updates an existing WebSocket subscription with new events.
Args: subscription : The WebSocket subscription to update. events (list, optional): A list of events to update the subscription with. Default is None.
Returns: WebSocketSubscription: The updated WebSocket subscription.
Raises: Exception: If any error occurs during the process.
Note: - Updates the specified WebSocket subscription with the new events provided. - If the update is successful, returns the updated WebSocket subscription. - If an error occurs during the update process, triggers the updateSubscriptionError event and raises an exception.
253 async def remove_subscription(self, subscription): 254 """ 255 Removes an existing WebSocket subscription. 256 257 Args: 258 subscription : The WebSocket subscription to remove. 259 260 Raises: 261 Exception: If any error occurs during the removal process. 262 263 Note: 264 - Removes the specified WebSocket subscription. 265 - If the removal is successful, the subscription is effectively unsubscribed from the events it was subscribed to. 266 - If an error occurs during the removal process, triggers the removeSubscriptionError event and raises an exception. 267 """ 268 try: 269 await subscription.remove() 270 except Exception as e: 271 self.trigger(WebSocketEvents.removeSubscriptionError, e) 272 raise
Removes an existing WebSocket subscription.
Args: subscription : The WebSocket subscription to remove.
Raises: Exception: If any error occurs during the removal process.
Note: - Removes the specified WebSocket subscription. - If the removal is successful, the subscription is effectively unsubscribed from the events it was subscribed to. - If an error occurs during the removal process, triggers the removeSubscriptionError event and raises an exception.
Inherited Members
- observable.core.Observable
- events
- on
- off
- once
- trigger