Skip to content

Commit c3d41fc

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit c3d41fc

2 files changed

Lines changed: 137 additions & 1 deletion

File tree

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(value.payload),
116+
value.expires,
117+
sqlite3.Binary(value.tag))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_inventory_flush.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# flake8: noqa:E402
3+
import os
4+
import struct
5+
import tempfile
6+
import threading
7+
import time
8+
import unittest
9+
10+
from .common import skip_python3
11+
12+
# pylint: disable=protected-access,wrong-import-order,wrong-import-position
13+
14+
skip_python3()
15+
16+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
17+
18+
from pybitmessage import helper_sql # noqa:E402
19+
from pybitmessage.class_sqlThread import sqlThread # noqa:E402
20+
from pybitmessage.storage.sqlite import SqliteInventory # noqa:E402
21+
22+
23+
class TestInventoryFlush(unittest.TestCase):
24+
"""
25+
Integration test: exercises flush() end-to-end with the real sqlThread
26+
consumer running, so that type errors in parameter binding surface here
27+
rather than silently killing a production thread.
28+
"""
29+
30+
@classmethod
31+
def setUpClass(cls):
32+
sql_lookup = sqlThread()
33+
sql_lookup.daemon = True
34+
sql_lookup.start()
35+
helper_sql.sql_ready.wait()
36+
cls.inventory = SqliteInventory()
37+
38+
@classmethod
39+
def tearDownClass(cls):
40+
helper_sql.sqlStoredProcedure('exit')
41+
for thread in threading.enumerate():
42+
if thread.name == "SQL":
43+
thread.join(timeout=10)
44+
45+
# -- helpers ----------------------------------------------------------
46+
47+
@staticmethod
48+
def _make_hash(seed):
49+
"""Return a 32-byte hash derived from *seed*."""
50+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
51+
52+
def _flush_and_check(self, obj_hash):
53+
"""
54+
Flush the inventory to the database, clear the _objects lookup
55+
cache so that __contains__ is forced to hit sqlite, then verify
56+
the hash is found via the normal inventory API.
57+
"""
58+
self.inventory.flush()
59+
self.inventory._objects.clear()
60+
self.assertIn(obj_hash, self.inventory)
61+
62+
# -- test cases -------------------------------------------------------
63+
64+
def test_flush_with_bytes_payload(self):
65+
"""Baseline: payload and tag are plain bytes."""
66+
h = self._make_hash(1)
67+
self.inventory[h] = (
68+
2, 1, b'\x80\x01' + os.urandom(64),
69+
int(time.time()) + 3600, b'\xff' * 32)
70+
self._flush_and_check(h)
71+
72+
def test_flush_with_memoryview_payload(self):
73+
"""
74+
Reproduce the production crash: payload and tag as memoryview
75+
cause 'Error binding parameter 3 - probably unsupported type.'
76+
"""
77+
h = self._make_hash(2)
78+
self.inventory[h] = (
79+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
80+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
81+
self._flush_and_check(h)
82+
83+
def test_flush_with_bytearray_payload(self):
84+
"""bytearray is another bytes-like type that could trip sqlite3."""
85+
h = self._make_hash(3)
86+
self.inventory[h] = (
87+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
88+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
89+
self._flush_and_check(h)
90+
91+
def test_flush_with_empty_tag(self):
92+
"""Empty tag (b'') must not break the INSERT."""
93+
h = self._make_hash(4)
94+
self.inventory[h] = (
95+
2, 1, b'\x80\x04' + os.urandom(64),
96+
int(time.time()) + 3600, b'')
97+
self._flush_and_check(h)
98+
99+
# pylint: disable=redefined-variable-type
100+
def test_flush_multiple_mixed_types(self):
101+
"""Flush a batch of items with mixed blob types."""
102+
count = 20
103+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
104+
expires = int(time.time()) + 3600
105+
106+
for i, h in enumerate(hashes):
107+
payload = struct.pack('>I', i) + os.urandom(60)
108+
tag = struct.pack('>I', i) + b'\x00' * 28
109+
if i % 3 == 0:
110+
payload = memoryview(payload)
111+
tag = memoryview(tag)
112+
elif i % 3 == 1:
113+
payload = bytearray(payload)
114+
tag = bytearray(tag)
115+
self.inventory[h] = (2, 1, payload, expires, tag)
116+
117+
self.inventory.flush()
118+
self.inventory._objects.clear()
119+
120+
for i, h in enumerate(hashes):
121+
self.assertIn(
122+
h, self.inventory,
123+
"Item {} missing after batch flush".format(i))
124+
125+
def test_flush_clears_memory_cache(self):
126+
"""After flush the in-memory _inventory dict must be empty."""
127+
h = self._make_hash(0xF0)
128+
self.inventory[h] = (
129+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
130+
self.inventory.flush()
131+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)