Skip to content

Commit d384c4a

Browse files
author
Wasin Waeosri
committed
change logs: handle multiple fragments
1 parent 22123ef commit d384c4a

File tree

2 files changed

+62
-23
lines changed

2 files changed

+62
-23
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ market_price.py
33
mrn_python/
44
test_result.txt
55
test_result_2.txt
6+
test_result_3.txt
67
mrn_other_example/

mrn_prototype.py

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,37 +64,75 @@ def processRefresh(ws, message_json):
6464
decodeFieldList(message_json["Fields"])
6565

6666

67+
def parseNewsData(fragment):
68+
decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
69+
print("News = %s" % decompressed_data)
70+
71+
6772
def processUpdate(ws, message_json):
68-
#print("RECEIVED: Update Message")
69-
#print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))
73+
print("RECEIVED: Update Message")
74+
# print(message_json)
7075

7176
fields_data = message_json["Fields"]
7277
# Dump the FieldList first (for informational purposes)
7378
decodeFieldList(message_json["Fields"])
7479

75-
# Get data for all requried fields
76-
fragment = fields_data["FRAGMENT"]
77-
frag_num = fields_data["FRAG_NUM"]
78-
guid = fields_data["GUID"]
79-
mrn_src = fields_data["MRN_SRC"]
80-
tot_size = int(fields_data["TOT_SIZE"])
8180
try:
82-
fragment_decoded = base64.b64decode(fragment)
81+
# Get data for all requried fields
82+
fragment = base64.b64decode(fields_data["FRAGMENT"])
83+
frag_num = int(fields_data["FRAG_NUM"])
84+
guid = fields_data["GUID"]
85+
mrn_src = fields_data["MRN_SRC"]
86+
8387
print("GUID = %s" % guid)
84-
print("TOT_SIZE = %d" % tot_size)
85-
print("fragment length = %d" % len(fragment_decoded))
86-
# if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
87-
88-
if tot_size == len(fragment_decoded): # Completed News
89-
decompressed_data = zlib.decompress(
90-
fragment_decoded, zlib.MAX_WBITS | 32)
91-
print("News = %s" % decompressed_data)
92-
else:
93-
print("Multiple Fragments!!")
88+
print("FRAG_NUM = %d" % frag_num)
89+
print("MRN_SRC = %s" % mrn_src)
90+
91+
#fragment_decoded = base64.b64decode(fragment)
92+
print("fragment length = %d" % len(fragment))
93+
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
94+
guid_index = next((index for (index, d) in enumerate(
95+
_news_envelopes) if d["guid"] == guid), None)
96+
envelop = _news_envelopes[guid_index]
97+
if envelop:
98+
print("process multiple fragments for guid %s" %
99+
envelop["guid"])
100+
# print(envelop)
101+
#print("fragment before merge = %d" % len(envelop["data"]["fragment"]))
102+
103+
# Merge incoming fragment to current fragment
104+
envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment
105+
106+
#print("TOT_SIZE from envelop = %d" % envelop["data"]["tot_size"])
107+
#print("fragment after merge = %d" % len(envelop["data"]["fragment"]))
108+
if envelop["data"]["tot_size"] == len(envelop["data"]["fragment"]):
109+
parseNewsData(envelop["data"]["fragment"])
110+
else:
111+
return None
112+
else: # FRAG_NUM:1 The first fragment
113+
tot_size = int(fields_data["TOT_SIZE"])
114+
print("TOT_SIZE = %d" % tot_size)
115+
if tot_size == len(fragment): # Completed News
116+
parseNewsData(fragment)
117+
pass
118+
else:
119+
#print("Receiving Multiple Fragments!!")
120+
print("Add new fragments to news envelop for guid %s" % guid)
121+
_news_envelopes.append({
122+
"guid": guid,
123+
"data": {
124+
"fragment": fragment,
125+
"mrn_src": mrn_src,
126+
"frag_num": frag_num,
127+
"tot_size": tot_size
128+
}
129+
})
130+
except KeyError as keyerror:
131+
print('KeyError exception: ', keyerror)
94132
except zlib.error as error:
95-
print(error)
96-
except:
97-
print("Error!!!")
133+
print('zlib exception: ', error)
134+
except Exception as e:
135+
print('exception: ', sys.exc_info()[0])
98136

99137

100138
def processStatus(ws, message_json):
@@ -249,6 +287,6 @@ def on_open(ws):
249287

250288
try:
251289
while True:
252-
time.sleep(1)
290+
time.sleep(600)
253291
except KeyboardInterrupt:
254292
web_socket_app.close()

0 commit comments

Comments
 (0)