Skip to content

Commit 4eb45fd

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit 4eb45fd

2 files changed

Lines changed: 138 additions & 1 deletion

File tree

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(value.payload),
116+
value.expires,
117+
sqlite3.Binary(value.tag))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_inventory_flush.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# flake8: noqa:E402
3+
import os
4+
import struct
5+
import tempfile
6+
import threading
7+
import time
8+
import unittest
9+
10+
from .common import skip_python3
11+
12+
# pylint: disable=protected-access
13+
14+
skip_python3()
15+
16+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
17+
18+
from pybitmessage.helper_sql import ( # noqa:E402
19+
sql_ready, sqlStoredProcedure)
20+
from pybitmessage.class_sqlThread import sqlThread # noqa:E402
21+
from pybitmessage.storage.sqlite import SqliteInventory # noqa:E402
22+
23+
24+
class TestInventoryFlush(unittest.TestCase):
25+
"""
26+
Integration test: exercises flush() end-to-end with the real sqlThread
27+
consumer running, so that type errors in parameter binding surface here
28+
rather than silently killing a production thread.
29+
"""
30+
31+
@classmethod
32+
def setUpClass(cls):
33+
sql_lookup = sqlThread()
34+
sql_lookup.daemon = True
35+
sql_lookup.start()
36+
sql_ready.wait()
37+
cls.inventory = SqliteInventory()
38+
39+
@classmethod
40+
def tearDownClass(cls):
41+
sqlStoredProcedure('exit')
42+
for thread in threading.enumerate():
43+
if thread.name == "SQL":
44+
thread.join(timeout=10)
45+
46+
# -- helpers ----------------------------------------------------------
47+
48+
@staticmethod
49+
def _make_hash(seed):
50+
"""Return a 32-byte hash derived from *seed*."""
51+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
52+
53+
def _flush_and_check(self, obj_hash):
54+
"""
55+
Flush the inventory to the database, clear the _objects lookup
56+
cache so that __contains__ is forced to hit sqlite, then verify
57+
the hash is found via the normal inventory API.
58+
"""
59+
self.inventory.flush()
60+
self.inventory._objects.clear()
61+
self.assertIn(obj_hash, self.inventory)
62+
63+
# -- test cases -------------------------------------------------------
64+
65+
def test_flush_with_bytes_payload(self):
66+
"""Baseline: payload and tag are plain bytes."""
67+
h = self._make_hash(1)
68+
self.inventory[h] = (
69+
2, 1, b'\x80\x01' + os.urandom(64),
70+
int(time.time()) + 3600, b'\xff' * 32)
71+
self._flush_and_check(h)
72+
73+
def test_flush_with_memoryview_payload(self):
74+
"""
75+
Reproduce the production crash: payload and tag as memoryview
76+
cause 'Error binding parameter 3 - probably unsupported type.'
77+
"""
78+
h = self._make_hash(2)
79+
self.inventory[h] = (
80+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
81+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
82+
self._flush_and_check(h)
83+
84+
def test_flush_with_bytearray_payload(self):
85+
"""bytearray is another bytes-like type that could trip sqlite3."""
86+
h = self._make_hash(3)
87+
self.inventory[h] = (
88+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
89+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
90+
self._flush_and_check(h)
91+
92+
def test_flush_with_empty_tag(self):
93+
"""Empty tag (b'') must not break the INSERT."""
94+
h = self._make_hash(4)
95+
self.inventory[h] = (
96+
2, 1, b'\x80\x04' + os.urandom(64),
97+
int(time.time()) + 3600, b'')
98+
self._flush_and_check(h)
99+
100+
# pylint: disable=redefined-variable-type
101+
def test_flush_multiple_mixed_types(self):
102+
"""Flush a batch of items with mixed blob types."""
103+
count = 20
104+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
105+
expires = int(time.time()) + 3600
106+
107+
for i, h in enumerate(hashes):
108+
payload = struct.pack('>I', i) + os.urandom(60)
109+
tag = struct.pack('>I', i) + b'\x00' * 28
110+
if i % 3 == 0:
111+
payload = memoryview(payload)
112+
tag = memoryview(tag)
113+
elif i % 3 == 1:
114+
payload = bytearray(payload)
115+
tag = bytearray(tag)
116+
self.inventory[h] = (2, 1, payload, expires, tag)
117+
118+
self.inventory.flush()
119+
self.inventory._objects.clear()
120+
121+
for i, h in enumerate(hashes):
122+
self.assertIn(
123+
h, self.inventory,
124+
"Item {} missing after batch flush".format(i))
125+
126+
def test_flush_clears_memory_cache(self):
127+
"""After flush the in-memory _inventory dict must be empty."""
128+
h = self._make_hash(0xF0)
129+
self.inventory[h] = (
130+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
131+
self.inventory.flush()
132+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)