Skip to content

Commit 5c8a990

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit 5c8a990

2 files changed

Lines changed: 141 additions & 1 deletion

File tree

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(value.payload),
116+
value.expires,
117+
sqlite3.Binary(value.tag))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_inventory_flush.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# pylint: disable=protected-access,wrong-import-order,wrong-import-position
3+
# pylint: disable=import-outside-toplevel
4+
5+
import os
6+
import struct
7+
import tempfile
8+
import threading
9+
import time
10+
11+
from .common import skip_python3
12+
from .partial import TestPartialRun
13+
14+
skip_python3()
15+
16+
17+
class TestInventoryFlush(TestPartialRun):
18+
"""
19+
Integration test: exercises flush() end-to-end with the real sqlThread
20+
consumer running, so that type errors in parameter binding surface here
21+
rather than silently killing a production thread.
22+
"""
23+
24+
@classmethod
25+
def setUpClass(cls):
26+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
27+
super(TestInventoryFlush, cls).setUpClass()
28+
29+
from class_sqlThread import sqlThread
30+
from helper_sql import sql_ready, sqlStoredProcedure
31+
from storage.sqlite import SqliteInventory
32+
33+
cls._sqlStoredProcedure = staticmethod(sqlStoredProcedure)
34+
35+
sql_lookup = sqlThread()
36+
sql_lookup.daemon = True
37+
sql_lookup.start()
38+
sql_ready.wait()
39+
cls.inventory = SqliteInventory()
40+
41+
@classmethod
42+
def tearDownClass(cls):
43+
cls._sqlStoredProcedure('exit')
44+
for thread in threading.enumerate():
45+
if thread.name == "SQL":
46+
thread.join(timeout=10)
47+
super(TestInventoryFlush, cls).tearDownClass()
48+
49+
# -- helpers ----------------------------------------------------------
50+
51+
@staticmethod
52+
def _make_hash(seed):
53+
"""Return a 32-byte hash derived from *seed*."""
54+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
55+
56+
def _flush_and_check(self, obj_hash):
57+
"""
58+
Flush the inventory to the database, clear the _objects lookup
59+
cache so that __contains__ is forced to hit sqlite, then verify
60+
the hash is found via the normal inventory API.
61+
"""
62+
self.inventory.flush()
63+
self.inventory._objects.clear()
64+
self.assertIn(obj_hash, self.inventory)
65+
66+
# -- test cases -------------------------------------------------------
67+
68+
def test_flush_with_bytes_payload(self):
69+
"""Baseline: payload and tag are plain bytes."""
70+
h = self._make_hash(1)
71+
self.inventory[h] = (
72+
2, 1, b'\x80\x01' + os.urandom(64),
73+
int(time.time()) + 3600, b'\xff' * 32)
74+
self._flush_and_check(h)
75+
76+
def test_flush_with_memoryview_payload(self):
77+
"""
78+
Reproduce the production crash: payload and tag as memoryview
79+
cause 'Error binding parameter 3 - probably unsupported type.'
80+
"""
81+
h = self._make_hash(2)
82+
self.inventory[h] = (
83+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
84+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
85+
self._flush_and_check(h)
86+
87+
def test_flush_with_bytearray_payload(self):
88+
"""bytearray is another bytes-like type that could trip sqlite3."""
89+
h = self._make_hash(3)
90+
self.inventory[h] = (
91+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
92+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
93+
self._flush_and_check(h)
94+
95+
def test_flush_with_empty_tag(self):
96+
"""Empty tag (b'') must not break the INSERT."""
97+
h = self._make_hash(4)
98+
self.inventory[h] = (
99+
2, 1, b'\x80\x04' + os.urandom(64),
100+
int(time.time()) + 3600, b'')
101+
self._flush_and_check(h)
102+
103+
# pylint: disable=redefined-variable-type
104+
def test_flush_multiple_mixed_types(self):
105+
"""Flush a batch of items with mixed blob types."""
106+
count = 20
107+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
108+
expires = int(time.time()) + 3600
109+
110+
for i, h in enumerate(hashes):
111+
payload = struct.pack('>I', i) + os.urandom(60)
112+
tag = struct.pack('>I', i) + b'\x00' * 28
113+
if i % 3 == 0:
114+
payload = memoryview(payload)
115+
tag = memoryview(tag)
116+
elif i % 3 == 1:
117+
payload = bytearray(payload)
118+
tag = bytearray(tag)
119+
self.inventory[h] = (2, 1, payload, expires, tag)
120+
121+
self.inventory.flush()
122+
self.inventory._objects.clear()
123+
124+
for i, h in enumerate(hashes):
125+
self.assertIn(
126+
h, self.inventory,
127+
"Item {} missing after batch flush".format(i))
128+
129+
def test_flush_clears_memory_cache(self):
130+
"""After flush the in-memory _inventory dict must be empty."""
131+
h = self._make_hash(0xF0)
132+
self.inventory[h] = (
133+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
134+
self.inventory.flush()
135+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)