ringcentral.websocket.web_socket_subscription

  1#!/usr/bin/env python
  2# encoding: utf-8
  3import uuid
  4import json
  5from .events import WebSocketEvents
  6from observable import Observable
  7
  8# _subscription format example: https://git.ringcentral.com/platform/wsg/-/blob/master/RingCentral_WebSocket_API.md#step-4-subscribing-to-rc-events
  9
 10
 11class WebSocketSubscription(Observable):
 12    def __init__(self, web_socket_client):
 13        Observable.__init__(self)
 14        self._web_socket_client = web_socket_client
 15        self._event_filters = []
 16        self._subscription = None
 17
 18    def on_message(self, message):
 19        message_json = json.loads(message)
 20        if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']):
 21            self.set_subscription(message_json)
 22            self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self)
 23        else: 
 24            if(message_json[0]['type'] == 'ServerNotification'):
 25                self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json)
 26
 27    async def register(self, events=None):
 28        if not self._subscription:
 29            await self.subscribe(events=events)
 30        else:
 31            await self.update(events=events)
 32
 33    def add_events(self, events):
 34        self._event_filters += events
 35        pass
 36
 37    def set_events(self, events):
 38        self._event_filters = events
 39
 40    async def subscribe(self, events=None):
 41        if events:
 42            self.set_events(events)
 43
 44        if not self._event_filters or len(self._event_filters) == 0:
 45            raise Exception("Events are undefined")
 46
 47        try:
 48            messageId = str(uuid.uuid4())
 49            requestBodyJson = [
 50                {
 51                    "type": "ClientRequest",
 52                    "messageId": messageId,
 53                    "method": "POST",
 54                    "path": "/restapi/v1.0/subscription/",
 55                },
 56                {
 57                    "eventFilters": self._event_filters,
 58                    "deliveryMode": {"transportType": "WebSocket"},
 59                },
 60            ]
 61            await self._web_socket_client.send_message(requestBodyJson)
 62            self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message)
 63
 64        except Exception as e:
 65            self.reset()
 66            print(e)
 67            raise
 68
 69    async def update(self, events=None):
 70        if events:
 71            self.set_events(events)
 72
 73        if not self._event_filters or len(self._event_filters) == 0:
 74            raise Exception("Events are undefined")
 75
 76        try:
 77            subscriptionId = self._subscription[1]["id"]
 78            messageId = str(uuid.uuid4())
 79            requestBodyJson = [
 80                {
 81                    "type": "ClientRequest",
 82                    "messageId": messageId,
 83                    "method": "PUT",
 84                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
 85                },
 86                {
 87                    "eventFilters": self._event_filters,
 88                    "deliveryMode": {"transportType": "WebSocket"},
 89                },
 90            ]
 91            await self._web_socket_client.send_message(requestBodyJson)
 92            self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self)
 93
 94        except Exception as e:
 95            self.reset()
 96            print(e)
 97            raise
 98
 99    async def remove(self):
100        subscriptionId = self._subscription[1]["id"]
101        if not subscriptionId:
102            raise Exception("Missing subscriptionId")
103
104        try:
105            messageId = str(uuid.uuid4())
106            requestBodyJson = [
107                {
108                    "type": "ClientRequest",
109                    "messageId": messageId,
110                    "method": "DELETE",
111                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
112                }
113            ]
114
115            await self._web_socket_client.send_message(requestBodyJson)
116            self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message)
117            self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved)
118
119            self.reset()
120
121        except Exception as e:
122            self.reset()
123            print(e)
124            raise
125
126    def set_subscription(self, data):
127        self._subscription = data
128
129    def get_subscription_info(self):
130        return self._subscription
131
132    def reset(self):
133        self._subscription = None
134
135    def destroy(self):
136        self.reset()
137        self.off()
138
139
140if __name__ == "__main__":
141    pass
class WebSocketSubscription(observable.core.Observable):
 12class WebSocketSubscription(Observable):
 13    def __init__(self, web_socket_client):
 14        Observable.__init__(self)
 15        self._web_socket_client = web_socket_client
 16        self._event_filters = []
 17        self._subscription = None
 18
 19    def on_message(self, message):
 20        message_json = json.loads(message)
 21        if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']):
 22            self.set_subscription(message_json)
 23            self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self)
 24        else: 
 25            if(message_json[0]['type'] == 'ServerNotification'):
 26                self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json)
 27
 28    async def register(self, events=None):
 29        if not self._subscription:
 30            await self.subscribe(events=events)
 31        else:
 32            await self.update(events=events)
 33
 34    def add_events(self, events):
 35        self._event_filters += events
 36        pass
 37
 38    def set_events(self, events):
 39        self._event_filters = events
 40
 41    async def subscribe(self, events=None):
 42        if events:
 43            self.set_events(events)
 44
 45        if not self._event_filters or len(self._event_filters) == 0:
 46            raise Exception("Events are undefined")
 47
 48        try:
 49            messageId = str(uuid.uuid4())
 50            requestBodyJson = [
 51                {
 52                    "type": "ClientRequest",
 53                    "messageId": messageId,
 54                    "method": "POST",
 55                    "path": "/restapi/v1.0/subscription/",
 56                },
 57                {
 58                    "eventFilters": self._event_filters,
 59                    "deliveryMode": {"transportType": "WebSocket"},
 60                },
 61            ]
 62            await self._web_socket_client.send_message(requestBodyJson)
 63            self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message)
 64
 65        except Exception as e:
 66            self.reset()
 67            print(e)
 68            raise
 69
 70    async def update(self, events=None):
 71        if events:
 72            self.set_events(events)
 73
 74        if not self._event_filters or len(self._event_filters) == 0:
 75            raise Exception("Events are undefined")
 76
 77        try:
 78            subscriptionId = self._subscription[1]["id"]
 79            messageId = str(uuid.uuid4())
 80            requestBodyJson = [
 81                {
 82                    "type": "ClientRequest",
 83                    "messageId": messageId,
 84                    "method": "PUT",
 85                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
 86                },
 87                {
 88                    "eventFilters": self._event_filters,
 89                    "deliveryMode": {"transportType": "WebSocket"},
 90                },
 91            ]
 92            await self._web_socket_client.send_message(requestBodyJson)
 93            self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self)
 94
 95        except Exception as e:
 96            self.reset()
 97            print(e)
 98            raise
 99
100    async def remove(self):
101        subscriptionId = self._subscription[1]["id"]
102        if not subscriptionId:
103            raise Exception("Missing subscriptionId")
104
105        try:
106            messageId = str(uuid.uuid4())
107            requestBodyJson = [
108                {
109                    "type": "ClientRequest",
110                    "messageId": messageId,
111                    "method": "DELETE",
112                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
113                }
114            ]
115
116            await self._web_socket_client.send_message(requestBodyJson)
117            self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message)
118            self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved)
119
120            self.reset()
121
122        except Exception as e:
123            self.reset()
124            print(e)
125            raise
126
127    def set_subscription(self, data):
128        self._subscription = data
129
130    def get_subscription_info(self):
131        return self._subscription
132
133    def reset(self):
134        self._subscription = None
135
136    def destroy(self):
137        self.reset()
138        self.off()

Event system for python

WebSocketSubscription(web_socket_client)
13    def __init__(self, web_socket_client):
14        Observable.__init__(self)
15        self._web_socket_client = web_socket_client
16        self._event_filters = []
17        self._subscription = None
def on_message(self, message):
19    def on_message(self, message):
20        message_json = json.loads(message)
21        if(message_json[0]['type'] == 'ClientRequest' and 'WSG-SubscriptionId' in message_json[0]['headers']):
22            self.set_subscription(message_json)
23            self._web_socket_client.trigger(WebSocketEvents.subscriptionCreated, self)
24        else: 
25            if(message_json[0]['type'] == 'ServerNotification'):
26                self._web_socket_client.trigger(WebSocketEvents.receiveSubscriptionNotification, message_json)
async def register(self, events=None):
28    async def register(self, events=None):
29        if not self._subscription:
30            await self.subscribe(events=events)
31        else:
32            await self.update(events=events)
def add_events(self, events):
34    def add_events(self, events):
35        self._event_filters += events
36        pass
def set_events(self, events):
38    def set_events(self, events):
39        self._event_filters = events
async def subscribe(self, events=None):
41    async def subscribe(self, events=None):
42        if events:
43            self.set_events(events)
44
45        if not self._event_filters or len(self._event_filters) == 0:
46            raise Exception("Events are undefined")
47
48        try:
49            messageId = str(uuid.uuid4())
50            requestBodyJson = [
51                {
52                    "type": "ClientRequest",
53                    "messageId": messageId,
54                    "method": "POST",
55                    "path": "/restapi/v1.0/subscription/",
56                },
57                {
58                    "eventFilters": self._event_filters,
59                    "deliveryMode": {"transportType": "WebSocket"},
60                },
61            ]
62            await self._web_socket_client.send_message(requestBodyJson)
63            self._web_socket_client.on(WebSocketEvents.receiveMessage, self.on_message)
64
65        except Exception as e:
66            self.reset()
67            print(e)
68            raise
async def update(self, events=None):
70    async def update(self, events=None):
71        if events:
72            self.set_events(events)
73
74        if not self._event_filters or len(self._event_filters) == 0:
75            raise Exception("Events are undefined")
76
77        try:
78            subscriptionId = self._subscription[1]["id"]
79            messageId = str(uuid.uuid4())
80            requestBodyJson = [
81                {
82                    "type": "ClientRequest",
83                    "messageId": messageId,
84                    "method": "PUT",
85                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
86                },
87                {
88                    "eventFilters": self._event_filters,
89                    "deliveryMode": {"transportType": "WebSocket"},
90                },
91            ]
92            await self._web_socket_client.send_message(requestBodyJson)
93            self._web_socket_client.trigger(WebSocketEvents.subscriptionUpdated, self)
94
95        except Exception as e:
96            self.reset()
97            print(e)
98            raise
async def remove(self):
100    async def remove(self):
101        subscriptionId = self._subscription[1]["id"]
102        if not subscriptionId:
103            raise Exception("Missing subscriptionId")
104
105        try:
106            messageId = str(uuid.uuid4())
107            requestBodyJson = [
108                {
109                    "type": "ClientRequest",
110                    "messageId": messageId,
111                    "method": "DELETE",
112                    "path": f"/restapi/v1.0/subscription/{subscriptionId}",
113                }
114            ]
115
116            await self._web_socket_client.send_message(requestBodyJson)
117            self._web_socket_client.off(WebSocketEvents.receiveMessage, self.on_message)
118            self._web_socket_client.trigger(WebSocketEvents.subscriptionRemoved)
119
120            self.reset()
121
122        except Exception as e:
123            self.reset()
124            print(e)
125            raise
def set_subscription(self, data):
127    def set_subscription(self, data):
128        self._subscription = data
def get_subscription_info(self):
130    def get_subscription_info(self):
131        return self._subscription
def reset(self):
133    def reset(self):
134        self._subscription = None
def destroy(self):
136    def destroy(self):
137        self.reset()
138        self.off()
Inherited Members
observable.core.Observable
events
on
off
once
trigger