GoPLS Viewer

Home|gopls/internal/jsonrpc2_v2/serve.go
1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8    "context"
9    "fmt"
10    "io"
11    "runtime"
12    "sync"
13    "sync/atomic"
14    "time"
15)
16
17// Listener is implemented by protocols to accept new inbound connections.
18type Listener interface {
19    // Accept accepts an inbound connection to a server.
20    // It blocks until either an inbound connection is made, or the listener is closed.
21    Accept(context.Context) (io.ReadWriteClosererror)
22
23    // Close closes the listener.
24    // Any blocked Accept or Dial operations will unblock and return errors.
25    Close() error
26
27    // Dialer returns a dialer that can be used to connect to this listener
28    // locally.
29    // If a listener does not implement this it will return nil.
30    Dialer() Dialer
31}
32
33// Dialer is used by clients to dial a server.
34type Dialer interface {
35    // Dial returns a new communication byte stream to a listening server.
36    Dial(ctx context.Context) (io.ReadWriteClosererror)
37}
38
39// Server is a running server that is accepting incoming connections.
40type Server struct {
41    listener Listener
42    binder   Binder
43    async    *async
44
45    shutdownOnce sync.Once
46    closing      int32 // atomic: set to nonzero when Shutdown is called
47}
48
49// Dial uses the dialer to make a new connection, wraps the returned
50// reader and writer using the framer to make a stream, and then builds
51// a connection on top of that stream using the binder.
52//
53// The returned Connection will operate independently using the Preempter and/or
54// Handler provided by the Binder, and will release its own resources when the
55// connection is broken, but the caller may Close it earlier to stop accepting
56// (or sending) new requests.
57func Dial(ctx context.Contextdialer Dialerbinder Binder) (*Connectionerror) {
58    // dial a server
59    rwcerr := dialer.Dial(ctx)
60    if err != nil {
61        return nilerr
62    }
63    return newConnection(ctxrwcbindernil), nil
64}
65
66// NewServer starts a new server listening for incoming connections and returns
67// it.
68// This returns a fully running and connected server, it does not block on
69// the listener.
70// You can call Wait to block on the server, or Shutdown to get the sever to
71// terminate gracefully.
72// To notice incoming connections, use an intercepting Binder.
73func NewServer(ctx context.Contextlistener Listenerbinder Binder) *Server {
74    server := &Server{
75        listenerlistener,
76        binder:   binder,
77        async:    newAsync(),
78    }
79    go server.run(ctx)
80    return server
81}
82
83// Wait returns only when the server has shut down.
84func (s *ServerWait() error {
85    return s.async.wait()
86}
87
88// Shutdown informs the server to stop accepting new connections.
89func (s *ServerShutdown() {
90    s.shutdownOnce.Do(func() {
91        atomic.StoreInt32(&s.closing1)
92        s.listener.Close()
93    })
94}
95
96// run accepts incoming connections from the listener,
97// If IdleTimeout is non-zero, run exits after there are no clients for this
98// duration, otherwise it exits only on error.
99func (s *Serverrun(ctx context.Context) {
100    defer s.async.done()
101
102    var activeConns sync.WaitGroup
103    for {
104        rwcerr := s.listener.Accept(ctx)
105        if err != nil {
106            // Only Shutdown closes the listener. If we get an error after Shutdown is
107            // called, assume that that was the cause and don't report the error;
108            // otherwise, report the error in case it is unexpected.
109            if atomic.LoadInt32(&s.closing) == 0 {
110                s.async.setError(err)
111            }
112            // We are done generating new connections for good.
113            break
114        }
115
116        // A new inbound connection.
117        activeConns.Add(1)
118        _ = newConnection(ctxrwcs.binderactiveConns.Done// unregisters itself when done
119    }
120    activeConns.Wait()
121}
122
123// NewIdleListener wraps a listener with an idle timeout.
124//
125// When there are no active connections for at least the timeout duration,
126// calls to Accept will fail with ErrIdleTimeout.
127//
128// A connection is considered inactive as soon as its Close method is called.
129func NewIdleListener(timeout time.Durationwrap ListenerListener {
130    l := &idleListener{
131        wrapped:   wrap,
132        timeout:   timeout,
133        active:    make(chan int1),
134        timedOut:  make(chan struct{}),
135        idleTimermake(chan *time.Timer1),
136    }
137    l.idleTimer <- time.AfterFunc(l.timeoutl.timerExpired)
138    return l
139}
140
141type idleListener struct {
142    wrapped Listener
143    timeout time.Duration
144
145    // Only one of these channels is receivable at any given time.
146    active    chan int         // count of active connections; closed when Close is called if not timed out
147    timedOut  chan struct{}    // closed when the idle timer expires
148    idleTimer chan *time.Timer // holds the timer only when idle
149}
150
151// Accept accepts an incoming connection.
152//
153// If an incoming connection is accepted concurrent to the listener being closed
154// due to idleness, the new connection is immediately closed.
155func (l *idleListenerAccept(ctx context.Context) (io.ReadWriteClosererror) {
156    rwcerr := l.wrapped.Accept(ctx)
157
158    select {
159    case nok := <-l.active:
160        if err != nil {
161            if ok {
162                l.active <- n
163            }
164            return nilerr
165        }
166        if ok {
167            l.active <- n + 1
168        } else {
169            // l.wrapped.Close Close has been called, but Accept returned a
170            // connection. This race can occur with concurrent Accept and Close calls
171            // with any net.Listener, and it is benign: since the listener was closed
172            // explicitly, it can't have also timed out.
173        }
174        return l.newConn(rwc), nil
175
176    case <-l.timedOut:
177        if err == nil {
178            // Keeping the connection open would leave the listener simultaneously
179            // active and closed due to idleness, which would be contradictory and
180            // confusing. Close the connection and pretend that it never happened.
181            rwc.Close()
182        } else {
183            // In theory the timeout could have raced with an unrelated error return
184            // from Accept. However, ErrIdleTimeout is arguably still valid (since we
185            // would have closed due to the timeout independent of the error), and the
186            // harm from returning a spurious ErrIdleTimeout is negliglible anyway.
187        }
188        return nilErrIdleTimeout
189
190    case timer := <-l.idleTimer:
191        if err != nil {
192            // The idle timer doesn't run until it receives itself from the idleTimer
193            // channel, so it can't have called l.wrapped.Close yet and thus err can't
194            // be ErrIdleTimeout. Leave the idle timer as it was and return whatever
195            // error we got.
196            l.idleTimer <- timer
197            return nilerr
198        }
199
200        if !timer.Stop() {
201            // Failed to stop the timer — the timer goroutine is in the process of
202            // firing. Send the timer back to the timer goroutine so that it can
203            // safely close the timedOut channel, and then wait for the listener to
204            // actually be closed before we return ErrIdleTimeout.
205            l.idleTimer <- timer
206            rwc.Close()
207            <-l.timedOut
208            return nilErrIdleTimeout
209        }
210
211        l.active <- 1
212        return l.newConn(rwc), nil
213    }
214}
215
216func (l *idleListenerClose() error {
217    select {
218    case _ok := <-l.active:
219        if ok {
220            close(l.active)
221        }
222
223    case <-l.timedOut:
224        // Already closed by the timer; take care not to double-close if the caller
225        // only explicitly invokes this Close method once, since the io.Closer
226        // interface explicitly leaves doubled Close calls undefined.
227        return ErrIdleTimeout
228
229    case timer := <-l.idleTimer:
230        if !timer.Stop() {
231            // Couldn't stop the timer. It shouldn't take long to run, so just wait
232            // (so that the Listener is guaranteed to be closed before we return)
233            // and pretend that this call happened afterward.
234            // That way we won't leak any timers or goroutines when Close returns.
235            l.idleTimer <- timer
236            <-l.timedOut
237            return ErrIdleTimeout
238        }
239        close(l.active)
240    }
241
242    return l.wrapped.Close()
243}
244
245func (l *idleListenerDialer() Dialer {
246    return l.wrapped.Dialer()
247}
248
249func (l *idleListenertimerExpired() {
250    select {
251    case nok := <-l.active:
252        if ok {
253            panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active"n))
254        } else {
255            panic("jsonrpc2: Close finished with idle timer still running")
256        }
257
258    case <-l.timedOut:
259        panic("jsonrpc2: idleListener idle timer fired more than once")
260
261    case <-l.idleTimer:
262        // The timer for this very call!
263    }
264
265    // Close the Listener with all channels still blocked to ensure that this call
266    // to l.wrapped.Close doesn't race with the one in l.Close.
267    defer close(l.timedOut)
268    l.wrapped.Close()
269}
270
271func (l *idleListenerconnClosed() {
272    select {
273    case nok := <-l.active:
274        if !ok {
275            // l is already closed, so it can't close due to idleness,
276            // and we don't need to track the number of active connections any more.
277            return
278        }
279        n--
280        if n == 0 {
281            l.idleTimer <- time.AfterFunc(l.timeoutl.timerExpired)
282        } else {
283            l.active <- n
284        }
285
286    case <-l.timedOut:
287        panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")
288
289    case <-l.idleTimer:
290        panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
291    }
292}
293
294type idleListenerConn struct {
295    wrapped   io.ReadWriteCloser
296    l         *idleListener
297    closeOnce sync.Once
298}
299
300func (l *idleListenernewConn(rwc io.ReadWriteCloser) *idleListenerConn {
301    c := &idleListenerConn{
302        wrappedrwc,
303        l:       l,
304    }
305
306    // A caller that forgets to call Close may disrupt the idleListener's
307    // accounting, even though the file descriptor for the underlying connection
308    // may eventually be garbage-collected anyway.
309    //
310    // Set a (best-effort) finalizer to verify that a Close call always occurs.
311    // (We will clear the finalizer explicitly in Close.)
312    runtime.SetFinalizer(c, func(c *idleListenerConn) {
313        panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
314    })
315
316    return c
317}
318
319func (c *idleListenerConnRead(p []byte) (interror)  { return c.wrapped.Read(p) }
320func (c *idleListenerConnWrite(p []byte) (interror) { return c.wrapped.Write(p) }
321
322func (c *idleListenerConnClose() error {
323    defer c.closeOnce.Do(func() {
324        c.l.connClosed()
325        runtime.SetFinalizer(cnil)
326    })
327    return c.wrapped.Close()
328}
329
MembersX
idleListener.Accept.ctx
idleListener.newConn.c
Dialer
Dial.rwc
idleListener
idleListener.Accept.l
idleListenerConn.Read
Dial
Server.Wait.s
Server.run
idleListener.Accept.rwc
idleListener.connClosed.l
Dial.ctx
idleListener.timedOut
idleListener.idleTimer
runtime
idleListenerConn.wrapped
NewIdleListener.timeout
idleListener.Dialer
Server.listener
Server.shutdownOnce
Server.run.ctx
Server.Shutdown
Server.run.BlockStmt.rwc
Dial.binder
NewServer
NewServer.server
idleListenerConn.l
idleListenerConn.Close.c
Server.async
Server.Shutdown.s
Server.run.s
idleListener.Close.l
idleListener.timerExpired
NewServer.listener
idleListener.timeout
idleListener.active
idleListenerConn.Close
Dial.err
idleListener.Close
idleListenerConn.Write.c
NewIdleListener.wrap
idleListener.wrapped
idleListener.timerExpired.l
idleListenerConn
idleListener.newConn.l
Listener
Server.run.activeConns
NewIdleListener
Server.Wait
idleListenerConn.Read.c
idleListenerConn.Read.p
idleListenerConn.Write
Server.closing
NewIdleListener.l
idleListenerConn.closeOnce
idleListener.Accept.err
idleListener.Dialer.l
idleListener.newConn
idleListener.newConn.rwc
idleListenerConn.Write.p
NewServer.binder
Server.run.BlockStmt.err
idleListener.Accept
Server
idleListener.connClosed
Server.binder
Dial.dialer
NewServer.ctx
Members
X