-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbluetooth.py
More file actions
77 lines (67 loc) · 2.05 KB
/
bluetooth.py
File metadata and controls
77 lines (67 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import socket
import json
import threading
class BluetoothManager:
def __init__(self, mac="20:25:05:00:CB:AD", port=1):
self.HC05_MAC = mac
self.PORT = port
self.sock = self.connect()
self._lock = threading.Lock()
self._latest = None
self._running = True
self._thread = threading.Thread(target=self._reader_loop, daemon=True)
self._thread.start()
def connect(self):
sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
sock.connect((self.HC05_MAC, self.PORT))
sock.settimeout(5)
return sock
def read_line(self):
data = b""
while True:
char = self.sock.recv(1)
if not char:
break
data += char
if char == b'\n':
break
return data.decode('utf-8', errors='ignore').strip()
def _reader_loop(self):
while self._running:
try:
line = self.read_line()
if not line:
continue
data = json.loads(line)
distance = data.get("d")
with self._lock:
self._latest = distance
except socket.timeout:
continue
except json.JSONDecodeError:
continue
except Exception as e:
print("[err] >>> ", e)
break
@property
def latest_distance(self):
with self._lock:
return self._latest
def close(self):
self._running = False
try:
self.sock.close()
except Exception:
pass
if __name__ == "__main__":
import time
bt = BluetoothManager()
print("[msg] >>> connected, polling latest distance...")
try:
while True:
d = bt.latest_distance
print(f"Distance: {d} cm" if d is not None else "Out of range")
time.sleep(0.2)
except KeyboardInterrupt:
bt.close()
print("\nDisconnected.")