-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.go
More file actions
326 lines (269 loc) · 7.88 KB
/
server.go
File metadata and controls
326 lines (269 loc) · 7.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// Package httpterm provides closable http.Server with extended read timeouts.
package httpterm
import (
"crypto/tls"
"errors"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
const (
// StateHead represents a connection that has read 1 or more bytes of
// a request. Contrary to http.StateActive, the server.ConnState hook fires
// before processing the data (headers). Connections transition from
// StateHead to http.StateActive or http.Closed.
StateHead http.ConnState = -1
)
// ErrClosing indicating that operation is not allowed as server is closing
var ErrClosing = errors.New("server closing")
// Server embeds http.Server and provides additional functionality.
// All the http.Server can be accessed directly and behaves as decribed in
// the original docs at http://golang.org/pkg/net/http/#Server.
//
// Assigning non-zero ReadTimeout is not advised, as it doesn't work well
// with better defined read timeout extensions provided by this class.
//
// ConnState function will be overwritten after call to Serve, but the original
// value will be preserved internally and called as expected.
// An additional value of StateHead will be passed to the function on top of
// the regular http.ConnState values.
type Server struct {
http.Server
// CloseOnSignal enables server shutdown on SIGTERM/SIGNINT.
// Signal handler is registered in Serve() method.
CloseOnSignal bool
// HeadReadTimeout defines timeout for reading request headers.
HeadReadTimeout time.Duration
// BodyReadTimeout defines timeout for reading request body.
// This timeout is being applied just before calling the request handler.
BodyReadTimeout time.Duration
// IdleTimeout defines for how long connection can be idle between requests.
IdleTimeout time.Duration
// NewAsActive prevents new connections from being idle before sending
// first request. If set, new connections will have HeadReadTimeout applied.
// If server is behind some proxy or a load balancer which maintains
// a permanent connection, setting up this flag is not recommended.
NewAsActive bool
listener *rtListener
lock sync.Mutex
closing bool
// conns is a map of connections which indicates whether connection is active,
// i.e. there a request being processed (including header handling)
conns map[net.Conn]bool
}
// Serve behaves as http.Server.Serve.
// See: http://golang.org/pkg/net/http/#Server.Serve
//
// Along with an error, pending channel is returned which will be closed once
// all connections are closed or hijacked.
func (s *Server) Serve(l net.Listener) (pending <-chan bool, err error) {
s.conns = make(map[net.Conn]bool)
oldConnState := s.ConnState
newConnState := func(c net.Conn, state http.ConnState) {
s.updateConnState(c, state)
// Pass to original handler
if oldConnState != nil {
oldConnState(c, state)
}
}
s.ConnState = newConnState
// Wrap with custom listener
s.listener = &rtListener{
Listener: l,
newAsActive: s.NewAsActive,
callback: func(c net.Conn) { newConnState(c, StateHead) },
}
// Register signal handling for shutdown if requested
if s.CloseOnSignal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
s.Close()
}()
}
// Serve loop
err = s.Server.Serve(s.listener)
// Clear error if server closing
s.lock.Lock()
if s.closing {
err = nil
}
s.lock.Unlock()
pending = s.monitorPending()
return
}
// ListenAndServe behaves as http.Server.ListenAndServe.
// See: http://golang.org/pkg/net/http/#Server.ListenAndServe
//
// Along with an error, pending channel is returned which will be closed once
// all connections are closed or hijacked.
func (s *Server) ListenAndServe() (pending <-chan bool, err error) {
pending = noPending
addr := s.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return
}
return s.Serve(ln)
}
// ListenAndServeTLS behaves as http.Server.ListenAndServeTLS.
// See: http://golang.org/pkg/net/http/#Server.ListenAndServeTLS
//
// Along with an error, pending channel is returned which will be closed once
// all connections are closed or hijacked.
func (s *Server) ListenAndServeTLS(certFile, keyFile string) (pending <-chan bool, err error) {
pending = noPending
config := &tls.Config{}
if s.TLSConfig != nil {
*config = *s.TLSConfig
}
if config.NextProtos == nil {
config.NextProtos = []string{"http/1.1"}
}
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return
}
addr := s.Addr
if addr == "" {
addr = ":https"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return
}
return s.Serve(tls.NewListener(ln, config))
}
// The following timeout will be applied to idle connections on server shutdown
var waitOnClose = 100 * time.Millisecond
// Close shutdowns the server by closing the listener, disabling keep alives
// and applying a predefined timeout to all idle connections. Timeouts for all connections
// currently processing a request will be unafected. Call to Close will cause
// Serve to quit. Subsequent calls to Close will return ErrClosing.
func (s *Server) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if s.closing {
return ErrClosing
}
if err := s.listener.Close(); err != nil {
return err
}
s.SetKeepAlivesEnabled(false)
s.closing = true
// Set a predefined deadline for all inactive connections (new or idle).
// If during this period state changes to active, request will be processed
// with regular request timeout, otherwise connection will be closed.
deadline := time.Now().Add(waitOnClose)
for c, active := range s.conns {
if !active {
c.SetReadDeadline(deadline)
}
}
return nil
}
// Closed pending channel
var noPending <-chan bool = func() chan bool {
ch := make(chan bool)
close(ch)
return ch
}()
func (s *Server) monitorPending() <-chan bool {
ch := make(chan bool)
go func() {
s.listener.wg.Wait()
close(ch)
}()
return ch
}
func (s *Server) getTimeout(state http.ConnState) (timeout time.Duration) {
// Update state for new connection according to policy
if state == http.StateNew {
if s.NewAsActive {
state = StateHead
} else {
state = http.StateIdle
}
}
switch state {
case http.StateIdle:
timeout = s.IdleTimeout
case StateHead:
timeout = s.HeadReadTimeout
case http.StateActive:
timeout = s.BodyReadTimeout
}
return
}
func (s *Server) updateConnState(c net.Conn, state http.ConnState) {
s.lock.Lock()
defer s.lock.Unlock()
// Update connection map
switch state {
case http.StateNew, http.StateIdle:
s.conns[c] = false
case http.StateClosed, http.StateHijacked:
delete(s.conns, c)
s.listener.wg.Done()
case StateHead:
s.conns[c] = true
}
if state == http.StateIdle {
if c, ok := c.(*rtConn); ok {
c.idle()
}
}
// Update timeout if not closing or new request
if !s.closing || state == StateHead || state == http.StateActive {
if t := s.getTimeout(state); t != 0 {
c.SetReadDeadline(time.Now().Add(t))
}
} else {
c.SetReadDeadline(time.Now().Add(waitOnClose))
}
}
type rtListener struct {
net.Listener
newAsActive bool // set new connections as active
callback func(c net.Conn) // data callback
wg sync.WaitGroup
}
func (l *rtListener) Accept() (c net.Conn, err error) {
l.wg.Add(1)
defer func() {
if c == nil {
l.wg.Done()
}
}()
c, err = l.Listener.Accept()
if c != nil {
c = &rtConn{c, l.newAsActive, l.callback}
}
return
}
// rtConn is a net.Conn that sets read deadlines for idle and active state.
// It automatically detects requests as first bytes are read after idle state.
type rtConn struct {
net.Conn
active bool // are we currently processing a request?
callback func(c net.Conn) // data callback
}
func (c *rtConn) Read(b []byte) (n int, err error) {
n, err = c.Conn.Read(b)
if n > 0 && !c.active {
c.callback(c)
}
return
}
func (c *rtConn) idle() {
c.active = false
}