|
| 1 | +# |----------------------------------------------------------------------------- |
| 2 | +# | This source code is provided under the Apache 2.0 license -- |
| 3 | +# | and is provided AS IS with no warranty or guarantee of fit for purpose. -- |
| 4 | +# | See the project's LICENSE.md for details. -- |
| 5 | +# | Copyright Refinitiv 2019. All rights reserved. -- |
| 6 | +# |----------------------------------------------------------------------------- |
| 7 | + |
| 8 | + |
| 9 | +#!/usr/bin/env python |
| 10 | +""" Simple example of outputting Machine Readable News JSON data using Websockets """ |
| 11 | + |
| 12 | +import sys |
| 13 | +import time |
| 14 | +import getopt |
| 15 | +import socket |
| 16 | +import json |
| 17 | +import websocket |
| 18 | +import threading |
| 19 | +from threading import Thread, Event |
| 20 | +import base64 |
| 21 | +import zlib |
| 22 | + |
| 23 | +# Global Default Variables |
| 24 | +hostname = '127.0.0.1' |
| 25 | +port = '15000' |
| 26 | +user = 'root' |
| 27 | +app_id = '256' |
| 28 | +position = socket.gethostbyname(socket.gethostname()) |
| 29 | +mrn_domain = 'NewsTextAnalytics' |
| 30 | +mrn_item = 'MRN_STORY' |
| 31 | +#mrn_item = 'MRN_TRNA' |
| 32 | + |
| 33 | +# Global Variables |
| 34 | +web_socket_app = None |
| 35 | +web_socket_open = False |
| 36 | + |
| 37 | +_news_envelopes = [] |
| 38 | + |
| 39 | +''' MRN Process Code ''' |
| 40 | + |
| 41 | + |
| 42 | +def decodeFieldList(fieldList_dict): |
| 43 | + for key, value in fieldList_dict.items(): |
| 44 | + print("Name = %s: Value = %s" % (key, value)) |
| 45 | + |
| 46 | + |
| 47 | +def send_mrn_request(ws): |
| 48 | + """ Create and send MRN request """ |
| 49 | + mrn_req_json = { |
| 50 | + 'ID': 2, |
| 51 | + "Domain": mrn_domain, |
| 52 | + 'Key': { |
| 53 | + 'Name': mrn_item |
| 54 | + } |
| 55 | + } |
| 56 | + |
| 57 | + ws.send(json.dumps(mrn_req_json)) |
| 58 | + print("SENT:") |
| 59 | + print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':'))) |
| 60 | + |
| 61 | + |
| 62 | +def processRefresh(ws, message_json): |
| 63 | + |
| 64 | + print("RECEIVED: Refresh Message") |
| 65 | + decodeFieldList(message_json["Fields"]) |
| 66 | + |
| 67 | + |
| 68 | +def processMRNUpdate(ws, message_json): # process incoming News Update messages |
| 69 | + #print("RECEIVED: Update Message") |
| 70 | + # print(message_json) |
| 71 | + |
| 72 | + fields_data = message_json["Fields"] |
| 73 | + # Dump the FieldList first (for informational purposes) |
| 74 | + # decodeFieldList(message_json["Fields"]) |
| 75 | + |
| 76 | + # declare variables |
| 77 | + tot_size = 0 |
| 78 | + guid = None |
| 79 | + |
| 80 | + try: |
| 81 | + # Get data for all requried fields |
| 82 | + fragment = base64.b64decode(fields_data["FRAGMENT"]) |
| 83 | + frag_num = int(fields_data["FRAG_NUM"]) |
| 84 | + guid = fields_data["GUID"] |
| 85 | + mrn_src = fields_data["MRN_SRC"] |
| 86 | + |
| 87 | + #print("GUID = %s" % guid) |
| 88 | + #print("FRAG_NUM = %d" % frag_num) |
| 89 | + #print("MRN_SRC = %s" % mrn_src) |
| 90 | + |
| 91 | + #fragment_decoded = base64.b64decode(fragment) |
| 92 | + |
| 93 | + if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details |
| 94 | + guid_index = next((index for (index, d) in enumerate( |
| 95 | + _news_envelopes) if d["guid"] == guid), None) |
| 96 | + envelop = _news_envelopes[guid_index] |
| 97 | + if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1: |
| 98 | + print("process multiple fragments for guid %s" % |
| 99 | + envelop["guid"]) |
| 100 | + |
| 101 | + #print("fragment before merge = %d" % len(envelop["data"]["fragment"])) |
| 102 | + |
| 103 | + # Merge incoming data to existing envelop and getting FRAGMENT and TOT_SIZE data to local variables |
| 104 | + fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment |
| 105 | + envelop["data"]["frag_num"] = frag_num |
| 106 | + tot_size = envelop["data"]["tot_size"] |
| 107 | + print("TOT_SIZE = %d" % tot_size) |
| 108 | + print("Current FRAGMENT length = %d" % len(fragment)) |
| 109 | + |
| 110 | + # The multiple fragments news are not completed, waiting. |
| 111 | + if tot_size != len(fragment): |
| 112 | + return None |
| 113 | + # The multiple fragments news are completed, delete assoiclate GUID dictionary |
| 114 | + elif tot_size == len(fragment): |
| 115 | + del _news_envelopes[guid_index] |
| 116 | + else: |
| 117 | + print("Error: Cannot find fragment for GUID %s with matching FRAG_NUM or MRN_SRC %s" % ( |
| 118 | + guid, mrn_src)) |
| 119 | + return None |
| 120 | + else: # FRAG_NUM = 1 The first fragment |
| 121 | + tot_size = int(fields_data["TOT_SIZE"]) |
| 122 | + print("FRAGMENT length = %d" % len(fragment)) |
| 123 | + if tot_size != len(fragment): # Completed News |
| 124 | + print("Add new fragments to news envelop for guid %s" % guid) |
| 125 | + _news_envelopes.append({ |
| 126 | + "guid": guid, |
| 127 | + "data": { |
| 128 | + "fragment": fragment, |
| 129 | + "mrn_src": mrn_src, |
| 130 | + "frag_num": frag_num, |
| 131 | + "tot_size": tot_size |
| 132 | + } |
| 133 | + }) |
| 134 | + return None |
| 135 | + |
| 136 | + # News Fragment(s) completed, decompress and print data as JSON to console |
| 137 | + if tot_size == len(fragment): |
| 138 | + print("decompress News FRAGMENT(s) for GUID %s" % guid) |
| 139 | + decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32) |
| 140 | + print("News = %s" % json.loads(decompressed_data)) |
| 141 | + |
| 142 | + except KeyError as keyerror: |
| 143 | + print('KeyError exception: ', keyerror) |
| 144 | + except IndexError as indexerror: |
| 145 | + print('IndexError exception: ', indexerror) |
| 146 | + except binascii.Error as b64error: |
| 147 | + print('base64 decoding exception:', b64error) |
| 148 | + except zlib.error as error: |
| 149 | + print('zlib decompressing exception: ', error) |
| 150 | + # Some console environments like Windows may encounter this unicode display as a limitation of OS |
| 151 | + except UnicodeEncodeError as encodeerror: |
| 152 | + print("UnicodeEncodeError exception. Cannot decode unicode character for %s in this enviroment: " % |
| 153 | + guid, encodeerror) |
| 154 | + except Exception as e: |
| 155 | + print('exception: ', sys.exc_info()[0]) |
| 156 | + |
| 157 | + |
| 158 | +def processStatus(ws, message_json): # process incoming status message |
| 159 | + print("RECEIVED: Status Message") |
| 160 | + print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) |
| 161 | + |
| 162 | + |
| 163 | +''' JSON-OMM Process functions ''' |
| 164 | + |
| 165 | + |
| 166 | +def process_message(ws, message_json): |
| 167 | + """ Parse at high level and output JSON of message """ |
| 168 | + message_type = message_json['Type'] |
| 169 | + |
| 170 | + if message_type == "Refresh": |
| 171 | + if "Domain" in message_json: |
| 172 | + message_domain = message_json["Domain"] |
| 173 | + if message_domain == "Login": |
| 174 | + process_login_response(ws, message_json) |
| 175 | + elif message_domain: |
| 176 | + processRefresh(ws, message_json) |
| 177 | + elif message_type == "Update": |
| 178 | + if "Domain" in message_json and message_json["Domain"] == mrn_domain: |
| 179 | + processMRNUpdate(ws, message_json) |
| 180 | + elif message_type == "Status": |
| 181 | + processStatus(ws, message_json) |
| 182 | + elif message_type == "Ping": |
| 183 | + pong_json = {'Type': 'Pong'} |
| 184 | + ws.send(json.dumps(pong_json)) |
| 185 | + print("SENT:") |
| 186 | + print(json.dumps(pong_json, sort_keys=True, |
| 187 | + indent=2, separators=(',', ':'))) |
| 188 | + |
| 189 | + |
| 190 | +def process_login_response(ws, message_json): |
| 191 | + """ Send item request """ |
| 192 | + # send_market_price_request(ws) |
| 193 | + send_mrn_request(ws) |
| 194 | + |
| 195 | + |
| 196 | +def send_login_request(ws): |
| 197 | + """ Generate a login request from command line data (or defaults) and send """ |
| 198 | + login_json = { |
| 199 | + 'ID': 1, |
| 200 | + "Domain": 'Login', |
| 201 | + 'Key': { |
| 202 | + 'Name': '', |
| 203 | + 'Elements': { |
| 204 | + 'ApplicationId': '', |
| 205 | + 'Position': '' |
| 206 | + } |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + login_json['Key']['Name'] = user |
| 211 | + login_json['Key']['Elements']['ApplicationId'] = app_id |
| 212 | + login_json['Key']['Elements']['Position'] = position |
| 213 | + |
| 214 | + ws.send(json.dumps(login_json)) |
| 215 | + print("SENT:") |
| 216 | + print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':'))) |
| 217 | + |
| 218 | + |
| 219 | +''' WebSocket Process functions ''' |
| 220 | + |
| 221 | + |
| 222 | +def on_message(ws, message): |
| 223 | + """ Called when message received, parse message into JSON for processing """ |
| 224 | + print("RECEIVED: ") |
| 225 | + message_json = json.loads(message) |
| 226 | + print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':'))) |
| 227 | + |
| 228 | + for singleMsg in message_json: |
| 229 | + process_message(ws, singleMsg) |
| 230 | + |
| 231 | + |
| 232 | +def on_error(ws, error): |
| 233 | + """ Called when websocket error has occurred """ |
| 234 | + print(error) |
| 235 | + |
| 236 | + |
| 237 | +def on_close(ws): |
| 238 | + """ Called when websocket is closed """ |
| 239 | + global web_socket_open |
| 240 | + print("WebSocket Closed") |
| 241 | + web_socket_open = False |
| 242 | + |
| 243 | + |
| 244 | +def on_open(ws): |
| 245 | + """ Called when handshake is complete and websocket is open, send login """ |
| 246 | + |
| 247 | + print("WebSocket successfully connected!") |
| 248 | + global web_socket_open |
| 249 | + web_socket_open = True |
| 250 | + send_login_request(ws) |
| 251 | + |
| 252 | + |
| 253 | +''' Main Process Code ''' |
| 254 | + |
| 255 | +if __name__ == "__main__": |
| 256 | + |
| 257 | + # Get command line parameters |
| 258 | + try: |
| 259 | + opts, args = getopt.getopt(sys.argv[1:], "", [ |
| 260 | + "help", "hostname=", "port=", "app_id=", "user=", "position="]) |
| 261 | + except getopt.GetoptError: |
| 262 | + print( |
| 263 | + 'Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]') |
| 264 | + sys.exit(2) |
| 265 | + for opt, arg in opts: |
| 266 | + if opt in ("--help"): |
| 267 | + print( |
| 268 | + 'Usage: market_price.py [--hostname hostname] [--port port] [--app_id app_id] [--user user] [--position position] [--help]') |
| 269 | + sys.exit(0) |
| 270 | + elif opt in ("--hostname"): |
| 271 | + hostname = arg |
| 272 | + elif opt in ("--port"): |
| 273 | + port = arg |
| 274 | + elif opt in ("--app_id"): |
| 275 | + app_id = arg |
| 276 | + elif opt in ("--user"): |
| 277 | + user = arg |
| 278 | + elif opt in ("--position"): |
| 279 | + position = arg |
| 280 | + |
| 281 | + # Start websocket handshake |
| 282 | + ws_address = "ws://{}:{}/WebSocket".format(hostname, port) |
| 283 | + print("Connecting to WebSocket " + ws_address + " ...") |
| 284 | + web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'], |
| 285 | + on_message=on_message, |
| 286 | + on_error=on_error, |
| 287 | + on_close=on_close, |
| 288 | + subprotocols=['tr_json2']) |
| 289 | + web_socket_app.on_open = on_open |
| 290 | + |
| 291 | + # Event loop |
| 292 | + wst = threading.Thread(target=web_socket_app.run_forever) |
| 293 | + wst.start() |
| 294 | + |
| 295 | + try: |
| 296 | + while True: |
| 297 | + time.sleep(600) |
| 298 | + except KeyboardInterrupt: |
| 299 | + web_socket_app.close() |
0 commit comments