ringcentral.websocket.web_socket_subscription
1#!/usr/bin/env python 2# encoding: utf-8 3import uuid 4import json 5from .events import WebSocketEvents 6from observable import Observable 7 8# _subscription format example: https://git.ringcentral.com/platform/wsg/-/blob/master/RingCentral_WebSocket_API.md#step-4-subscribing-to-rc-events 9 10 11class WebSocketSubscription(Observable): 12 def __init__(self, web_socket_client): 13 Observable.__init__(self) 14 self._web_socket_client = web_socket_client 15 self._event_filters = [] 16 self._subscription = None 17 18 def on_message(self, message): 19 message_json = json.loads(message) 20 if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']): 21 self.set_subscription(message_json) 22 self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self) 23 else: 24 if(message_json[0]['type'] == 'ServerNotification'): 25 self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json) 26 27 async def register(self, events=None): 28 if not self._subscription: 29 await self.subscribe(events=events) 30 else: 31 await self.update(events=events) 32 33 def add_events(self, events): 34 self._event_filters += events 35 pass 36 37 def set_events(self, events): 38 self._event_filters = events 39 40 async def subscribe(self, events=None): 41 if events: 42 self.set_events(events) 43 44 if not self._event_filters or len(self._event_filters) == 0: 45 raise Exception("Events are undefined") 46 47 try: 48 messageId = str(uuid.uuid4()) 49 requestBodyJson = [ 50 { 51 "type": "ClientRequest", 52 "messageId": messageId, 53 "method": "POST", 54 "path": "/restapi/v1.0/subscription/", 55 }, 56 { 57 "eventFilters": self._event_filters, 58 "deliveryMode": {"transportType": "WebSocket"}, 59 }, 60 ] 61 await self._web_socket_client.send_message(requestBodyJson) 62 self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message) 63 64 except Exception as e: 65 self.reset() 66 print(e) 67 raise 68 69 async def update(self, events=None): 70 if events: 71 self.set_events(events) 72 73 if not self._event_filters or len(self._event_filters) == 0: 74 raise Exception("Events are undefined") 75 76 try: 77 subscriptionId = self._subscription[1]["id"] 78 messageId = str(uuid.uuid4()) 79 requestBodyJson = [ 80 { 81 "type": "ClientRequest", 82 "messageId": messageId, 83 "method": "PUT", 84 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 85 }, 86 { 87 "eventFilters": self._event_filters, 88 "deliveryMode": {"transportType": "WebSocket"}, 89 }, 90 ] 91 await self._web_socket_client.send_message(requestBodyJson) 92 self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self) 93 94 except Exception as e: 95 self.reset() 96 print(e) 97 raise 98 99 async def remove(self): 100 subscriptionId = self._subscription[1]["id"] 101 if not subscriptionId: 102 raise Exception("Missing subscriptionId") 103 104 try: 105 messageId = str(uuid.uuid4()) 106 requestBodyJson = [ 107 { 108 "type": "ClientRequest", 109 "messageId": messageId, 110 "method": "DELETE", 111 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 112 } 113 ] 114 115 await self._web_socket_client.send_message(requestBodyJson) 116 self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message) 117 self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved) 118 119 self.reset() 120 121 except Exception as e: 122 self.reset() 123 print(e) 124 raise 125 126 def set_subscription(self, data): 127 self._subscription = data 128 129 def get_subscription_info(self): 130 return self._subscription 131 132 def reset(self): 133 self._subscription = None 134 135 def destroy(self): 136 self.reset() 137 self.off() 138 139 140if __name__ == "__main__": 141 pass
class
WebSocketSubscription(observable.core.Observable):
12class WebSocketSubscription(Observable): 13 def __init__(self, web_socket_client): 14 Observable.__init__(self) 15 self._web_socket_client = web_socket_client 16 self._event_filters = [] 17 self._subscription = None 18 19 def on_message(self, message): 20 message_json = json.loads(message) 21 if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']): 22 self.set_subscription(message_json) 23 self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self) 24 else: 25 if(message_json[0]['type'] == 'ServerNotification'): 26 self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json) 27 28 async def register(self, events=None): 29 if not self._subscription: 30 await self.subscribe(events=events) 31 else: 32 await self.update(events=events) 33 34 def add_events(self, events): 35 self._event_filters += events 36 pass 37 38 def set_events(self, events): 39 self._event_filters = events 40 41 async def subscribe(self, events=None): 42 if events: 43 self.set_events(events) 44 45 if not self._event_filters or len(self._event_filters) == 0: 46 raise Exception("Events are undefined") 47 48 try: 49 messageId = str(uuid.uuid4()) 50 requestBodyJson = [ 51 { 52 "type": "ClientRequest", 53 "messageId": messageId, 54 "method": "POST", 55 "path": "/restapi/v1.0/subscription/", 56 }, 57 { 58 "eventFilters": self._event_filters, 59 "deliveryMode": {"transportType": "WebSocket"}, 60 }, 61 ] 62 await self._web_socket_client.send_message(requestBodyJson) 63 self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message) 64 65 except Exception as e: 66 self.reset() 67 print(e) 68 raise 69 70 async def update(self, events=None): 71 if events: 72 self.set_events(events) 73 74 if not self._event_filters or len(self._event_filters) == 0: 75 raise Exception("Events are undefined") 76 77 try: 78 subscriptionId = self._subscription[1]["id"] 79 messageId = str(uuid.uuid4()) 80 requestBodyJson = [ 81 { 82 "type": "ClientRequest", 83 "messageId": messageId, 84 "method": "PUT", 85 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 86 }, 87 { 88 "eventFilters": self._event_filters, 89 "deliveryMode": {"transportType": "WebSocket"}, 90 }, 91 ] 92 await self._web_socket_client.send_message(requestBodyJson) 93 self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self) 94 95 except Exception as e: 96 self.reset() 97 print(e) 98 raise 99 100 async def remove(self): 101 subscriptionId = self._subscription[1]["id"] 102 if not subscriptionId: 103 raise Exception("Missing subscriptionId") 104 105 try: 106 messageId = str(uuid.uuid4()) 107 requestBodyJson = [ 108 { 109 "type": "ClientRequest", 110 "messageId": messageId, 111 "method": "DELETE", 112 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 113 } 114 ] 115 116 await self._web_socket_client.send_message(requestBodyJson) 117 self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message) 118 self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved) 119 120 self.reset() 121 122 except Exception as e: 123 self.reset() 124 print(e) 125 raise 126 127 def set_subscription(self, data): 128 self._subscription = data 129 130 def get_subscription_info(self): 131 return self._subscription 132 133 def reset(self): 134 self._subscription = None 135 136 def destroy(self): 137 self.reset() 138 self.off()
Event system for python
def
on_message(self, message):
19 def on_message(self, message): 20 message_json = json.loads(message) 21 if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']): 22 self.set_subscription(message_json) 23 self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self) 24 else: 25 if(message_json[0]['type'] == 'ServerNotification'): 26 self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json)
async def
subscribe(self, events=None):
41 async def subscribe(self, events=None): 42 if events: 43 self.set_events(events) 44 45 if not self._event_filters or len(self._event_filters) == 0: 46 raise Exception("Events are undefined") 47 48 try: 49 messageId = str(uuid.uuid4()) 50 requestBodyJson = [ 51 { 52 "type": "ClientRequest", 53 "messageId": messageId, 54 "method": "POST", 55 "path": "/restapi/v1.0/subscription/", 56 }, 57 { 58 "eventFilters": self._event_filters, 59 "deliveryMode": {"transportType": "WebSocket"}, 60 }, 61 ] 62 await self._web_socket_client.send_message(requestBodyJson) 63 self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message) 64 65 except Exception as e: 66 self.reset() 67 print(e) 68 raise
async def
update(self, events=None):
70 async def update(self, events=None): 71 if events: 72 self.set_events(events) 73 74 if not self._event_filters or len(self._event_filters) == 0: 75 raise Exception("Events are undefined") 76 77 try: 78 subscriptionId = self._subscription[1]["id"] 79 messageId = str(uuid.uuid4()) 80 requestBodyJson = [ 81 { 82 "type": "ClientRequest", 83 "messageId": messageId, 84 "method": "PUT", 85 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 86 }, 87 { 88 "eventFilters": self._event_filters, 89 "deliveryMode": {"transportType": "WebSocket"}, 90 }, 91 ] 92 await self._web_socket_client.send_message(requestBodyJson) 93 self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self) 94 95 except Exception as e: 96 self.reset() 97 print(e) 98 raise
async def
remove(self):
100 async def remove(self): 101 subscriptionId = self._subscription[1]["id"] 102 if not subscriptionId: 103 raise Exception("Missing subscriptionId") 104 105 try: 106 messageId = str(uuid.uuid4()) 107 requestBodyJson = [ 108 { 109 "type": "ClientRequest", 110 "messageId": messageId, 111 "method": "DELETE", 112 "path": f"/restapi/v1.0/subscription/{subscriptionId}", 113 } 114 ] 115 116 await self._web_socket_client.send_message(requestBodyJson) 117 self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message) 118 self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved) 119 120 self.reset() 121 122 except Exception as e: 123 self.reset() 124 print(e) 125 raise
Inherited Members
- observable.core.Observable
- events
- on
- off
- once
- trigger