-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpool.go
More file actions
145 lines (121 loc) · 3.41 KB
/
pool.go
File metadata and controls
145 lines (121 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package sqlite
import (
"context"
"errors"
"fmt"
"runtime"
"runtime/pprof"
"sync"
"time"
)
// #include <amalgamation/sqlite3.h>
// #include <stdint.h>
//
// extern char * go_strcpy(_GoString_ st);
// extern void go_free(void*);
// extern int sqlite_BindGoPointer(sqlite3_stmt* stmt, int pos, uintptr_t ptr, const char* name);
import "C"
const MemoryPath = "file::memory:?mode=memory"
var connectionsProfiles = pprof.NewProfile("t.sftw/sqlite/connections")
// ZombieTimeout is the time after which a transaction is considered leaked
var ZombieTimeout = 30 * time.Second
// Connections is a pool of connections to a single SQLite database.
type Connections struct {
free *Conn // free list
mx sync.Mutex // protects all above
wait sync.Cond
}
// FreeCount returns the number of free connections in the pool
func (c *Connections) FreeCount() int {
c.mx.Lock()
defer c.mx.Unlock()
i := 0
for p := c.free; p != nil; p = p.next {
var st *C.sqlite3_stmt
st = C.sqlite3_next_stmt(p.db, st)
if st != nil {
q := C.sqlite3_sql(st)
fmt.Println("dangling statements! ", C.GoString(q))
}
i++
}
return i
}
type ckey struct{}
type spkey struct{}
var NumThreads int
func init() {
// There is a limit to how many concurrent writes we can issue in SQLite at the same time,
// even in WAL mode (single writer). Increasing the number too much would still result in busy contention.
// This takes the same approach as Python's [ThreadPoolExecutor].
//
// [ThreadPoolExecutor]: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
NumThreads = min(runtime.NumCPU()+4, 32)
}
// OpenPool ceates a new connection pool
func OpenPool(name string, exts ...func(SQLITE3)) (*Connections, error) {
if name == ":memory:" {
return nil, errors.New(`":memory:" does not work with pools, use MemoryPath`)
}
var pool Connections
pool.wait = sync.Cond{L: &pool.mx}
ptr := &pool.free
for w := NumThreads; w > 0; w-- {
conn, err := Open(name, exts...)
if err != nil {
return nil, err
}
if w == 1 {
var mode string
err = conn.Exec(context.Background(), "PRAGMA journal_mode=WAL").ScanOne(&mode)
if err != nil || mode != "wal" {
return nil, fmt.Errorf("cannot set WAL mode (mode=%s): %w", mode, err)
}
}
*ptr = conn
ptr = &conn.next
}
return &pool, nil
}
func (p *Connections) Exec(ctx context.Context, cmd string, args ...any) *Rows {
ctn, reused := ctx.Value(ckey{}).(*Conn)
if !reused {
ctn = p.take()
}
rows := ctn.Exec(ctx, cmd, args...)
if !reused {
rows.final = func() { p.put(ctn); ctn.pointers.Unpin() }
}
return rows
}
// Close closes all connections in the pool.
// It can be safely called concurrently [Connections.Savepoint], [Connections.Exec] and [Connections.Release]
// but note that calls to [Connections.Savepoint] or [Connections.Exec] that happen after Close might block forever.
// The mechanism to terminate other connections has to be done out of band.
func (p *Connections) Close() error {
var err error
for range NumThreads {
ctn := p.take()
err = errors.Join(err, ctn.Close())
}
return err
}
func (p *Connections) take() *Conn {
p.mx.Lock()
for p.free == nil {
p.wait.Wait()
}
ctn := p.free
p.free = ctn.next
p.mx.Unlock()
connectionsProfiles.Add(ctn, 2)
return ctn
}
func (p *Connections) put(ctn *Conn) {
connectionsProfiles.Remove(ctn)
p.mx.Lock()
ctn.next = p.free
p.free = ctn
p.wait.Signal()
p.mx.Unlock()
}