GoPLS Viewer

Home|gopls/internal/jsonrpc2_v2/conn.go
1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8    "context"
9    "encoding/json"
10    "errors"
11    "fmt"
12    "io"
13    "sync"
14    "sync/atomic"
15    "time"
16
17    "golang.org/x/tools/internal/event"
18    "golang.org/x/tools/internal/event/keys"
19    "golang.org/x/tools/internal/event/label"
20    "golang.org/x/tools/internal/event/tag"
21)
22
23// Binder builds a connection configuration.
24// This may be used in servers to generate a new configuration per connection.
25// ConnectionOptions itself implements Binder returning itself unmodified, to
26// allow for the simple cases where no per connection information is needed.
27type Binder interface {
28    // Bind returns the ConnectionOptions to use when establishing the passed-in
29    // Connection.
30    //
31    // The connection is not ready to use when Bind is called,
32    // but Bind may close it without reading or writing to it.
33    Bind(context.Context, *ConnectionConnectionOptions
34}
35
36// A BinderFunc implements the Binder interface for a standalone Bind function.
37type BinderFunc func(context.Context, *ConnectionConnectionOptions
38
39func (f BinderFuncBind(ctx context.Contextc *ConnectionConnectionOptions {
40    return f(ctxc)
41}
42
43var _ Binder = BinderFunc(nil)
44
45// ConnectionOptions holds the options for new connections.
46type ConnectionOptions struct {
47    // Framer allows control over the message framing and encoding.
48    // If nil, HeaderFramer will be used.
49    Framer Framer
50    // Preempter allows registration of a pre-queue message handler.
51    // If nil, no messages will be preempted.
52    Preempter Preempter
53    // Handler is used as the queued message handler for inbound messages.
54    // If nil, all responses will be ErrNotHandled.
55    Handler Handler
56    // OnInternalError, if non-nil, is called with any internal errors that occur
57    // while serving the connection, such as protocol errors or invariant
58    // violations. (If nil, internal errors result in panics.)
59    OnInternalError func(error)
60}
61
62// Connection manages the jsonrpc2 protocol, connecting responses back to their
63// calls.
64// Connection is bidirectional; it does not have a designated server or client
65// end.
66type Connection struct {
67    seq int64 // must only be accessed using atomic operations
68
69    stateMu sync.Mutex
70    state   inFlightState // accessed only in updateInFlight
71    done    chan struct{} // closed (under stateMu) when state.closed is true and all goroutines have completed
72
73    writer chan Writer // 1-buffered; stores the writer when not in use
74
75    handler Handler
76
77    onInternalError func(error)
78    onDone          func()
79}
80
81// inFlightState records the state of the incoming and outgoing calls on a
82// Connection.
83type inFlightState struct {
84    connClosing bool  // true when the Connection's Close method has been called
85    reading     bool  // true while the readIncoming goroutine is running
86    readErr     error // non-nil when the readIncoming goroutine exits (typically io.EOF)
87    writeErr    error // non-nil if a call to the Writer has failed with a non-canceled Context
88
89    // closer shuts down and cleans up the Reader and Writer state, ideally
90    // interrupting any Read or Write call that is currently blocked. It is closed
91    // when the state is idle and one of: connClosing is true, readErr is non-nil,
92    // or writeErr is non-nil.
93    //
94    // After the closer has been invoked, the closer field is set to nil
95    // and the closeErr field is simultaneously set to its result.
96    closer   io.Closer
97    closeErr error // error returned from closer.Close
98
99    outgoingCalls         map[ID]*AsyncCall // calls only
100    outgoingNotifications int               // # of notifications awaiting "write"
101
102    // incoming stores the total number of incoming calls and notifications
103    // that have not yet written or processed a result.
104    incoming int
105
106    incomingByID map[ID]*incomingRequest // calls only
107
108    // handlerQueue stores the backlog of calls and notifications that were not
109    // already handled by a preempter.
110    // The queue does not include the request currently being handled (if any).
111    handlerQueue   []*incomingRequest
112    handlerRunning bool
113}
114
115// updateInFlight locks the state of the connection's in-flight requests, allows
116// f to mutate that state, and closes the connection if it is idle and either
117// is closing or has a read or write error.
118func (c *ConnectionupdateInFlight(f func(*inFlightState)) {
119    c.stateMu.Lock()
120    defer c.stateMu.Unlock()
121
122    s := &c.state
123
124    f(s)
125
126    select {
127    case <-c.done:
128        // The connection was already completely done at the start of this call to
129        // updateInFlight, so it must remain so. (The call to f should have noticed
130        // that and avoided making any updates that would cause the state to be
131        // non-idle.)
132        if !s.idle() {
133            panic("jsonrpc2_v2: updateInFlight transitioned to non-idle when already done")
134        }
135        return
136    default:
137    }
138
139    if s.idle() && s.shuttingDown(ErrUnknown) != nil {
140        if s.closer != nil {
141            s.closeErr = s.closer.Close()
142            s.closer = nil // prevent duplicate Close calls
143        }
144        if s.reading {
145            // The readIncoming goroutine is still running. Our call to Close should
146            // cause it to exit soon, at which point it will make another call to
147            // updateInFlight, set s.reading to false, and mark the Connection done.
148        } else {
149            // The readIncoming goroutine has exited, or never started to begin with.
150            // Since everything else is idle, we're completely done.
151            if c.onDone != nil {
152                c.onDone()
153            }
154            close(c.done)
155        }
156    }
157}
158
159// idle reports whether the connction is in a state with no pending calls or
160// notifications.
161//
162// If idle returns true, the readIncoming goroutine may still be running,
163// but no other goroutines are doing work on behalf of the connnection.
164func (s *inFlightStateidle() bool {
165    return len(s.outgoingCalls) == 0 && s.outgoingNotifications == 0 && s.incoming == 0 && !s.handlerRunning
166}
167
168// shuttingDown reports whether the connection is in a state that should
169// disallow new (incoming and outgoing) calls. It returns either nil or
170// an error that is or wraps the provided errClosing.
171func (s *inFlightStateshuttingDown(errClosing errorerror {
172    if s.connClosing {
173        // If Close has been called explicitly, it doesn't matter what state the
174        // Reader and Writer are in: we shouldn't be starting new work because the
175        // caller told us not to start new work.
176        return errClosing
177    }
178    if s.readErr != nil {
179        // If the read side of the connection is broken, we cannot read new call
180        // requests, and cannot read responses to our outgoing calls.
181        return fmt.Errorf("%w: %v"errClosings.readErr)
182    }
183    if s.writeErr != nil {
184        // If the write side of the connection is broken, we cannot write responses
185        // for incoming calls, and cannot write requests for outgoing calls.
186        return fmt.Errorf("%w: %v"errClosings.writeErr)
187    }
188    return nil
189}
190
191// incomingRequest is used to track an incoming request as it is being handled
192type incomingRequest struct {
193    *Request // the request being processed
194    ctx      context.Context
195    cancel   context.CancelFunc
196    endSpan  func() // called (and set to nil) when the response is sent
197}
198
199// Bind returns the options unmodified.
200func (o ConnectionOptionsBind(context.Context, *ConnectionConnectionOptions {
201    return o
202}
203
204// newConnection creates a new connection and runs it.
205//
206// This is used by the Dial and Serve functions to build the actual connection.
207//
208// The connection is closed automatically (and its resources cleaned up) when
209// the last request has completed after the underlying ReadWriteCloser breaks,
210// but it may be stopped earlier by calling Close (for a clean shutdown).
211func newConnection(bindCtx context.Contextrwc io.ReadWriteCloserbinder BinderonDone func()) *Connection {
212    // TODO: Should we create a new event span here?
213    // This will propagate cancellation from ctx; should it?
214    ctx := notDone{bindCtx}
215
216    c := &Connection{
217        state:  inFlightState{closerrwc},
218        done:   make(chan struct{}),
219        writermake(chan Writer1),
220        onDoneonDone,
221    }
222    // It's tempting to set a finalizer on c to verify that the state has gone
223    // idle when the connection becomes unreachable. Unfortunately, the Binder
224    // interface makes that unsafe: it allows the Handler to close over the
225    // Connection, which could create a reference cycle that would cause the
226    // Connection to become uncollectable.
227
228    options := binder.Bind(bindCtxc)
229    framer := options.Framer
230    if framer == nil {
231        framer = HeaderFramer()
232    }
233    c.handler = options.Handler
234    if c.handler == nil {
235        c.handler = defaultHandler{}
236    }
237    c.onInternalError = options.OnInternalError
238
239    c.writer <- framer.Writer(rwc)
240    reader := framer.Reader(rwc)
241
242    c.updateInFlight(func(s *inFlightState) {
243        select {
244        case <-c.done:
245            // Bind already closed the connection; don't start a goroutine to read it.
246            return
247        default:
248        }
249
250        // The goroutine started here will continue until the underlying stream is closed.
251        //
252        // (If the Binder closed the Connection already, this should error out and
253        // return almost immediately.)
254        s.reading = true
255        go c.readIncoming(ctxreaderoptions.Preempter)
256    })
257    return c
258}
259
260// Notify invokes the target method but does not wait for a response.
261// The params will be marshaled to JSON before sending over the wire, and will
262// be handed to the method invoked.
263func (c *ConnectionNotify(ctx context.Contextmethod stringparams interface{}) (err error) {
264    ctxdone := event.Start(ctxmethod,
265        tag.Method.Of(method),
266        tag.RPCDirection.Of(tag.Outbound),
267    )
268    attempted := false
269
270    defer func() {
271        labelStatus(ctxerr)
272        done()
273        if attempted {
274            c.updateInFlight(func(s *inFlightState) {
275                s.outgoingNotifications--
276            })
277        }
278    }()
279
280    c.updateInFlight(func(s *inFlightState) {
281        // If the connection is shutting down, allow outgoing notifications only if
282        // there is at least one call still in flight. The number of calls in flight
283        // cannot increase once shutdown begins, and allowing outgoing notifications
284        // may permit notifications that will cancel in-flight calls.
285        if len(s.outgoingCalls) == 0 && len(s.incomingByID) == 0 {
286            err = s.shuttingDown(ErrClientClosing)
287            if err != nil {
288                return
289            }
290        }
291        s.outgoingNotifications++
292        attempted = true
293    })
294    if err != nil {
295        return err
296    }
297
298    notifyerr := NewNotification(methodparams)
299    if err != nil {
300        return fmt.Errorf("marshaling notify parameters: %v"err)
301    }
302
303    event.Metric(ctxtag.Started.Of(1))
304    return c.write(ctxnotify)
305}
306
307// Call invokes the target method and returns an object that can be used to await the response.
308// The params will be marshaled to JSON before sending over the wire, and will
309// be handed to the method invoked.
310// You do not have to wait for the response, it can just be ignored if not needed.
311// If sending the call failed, the response will be ready and have the error in it.
312func (c *ConnectionCall(ctx context.Contextmethod stringparams interface{}) *AsyncCall {
313    // Generate a new request identifier.
314    id := Int64ID(atomic.AddInt64(&c.seq1))
315    ctxendSpan := event.Start(ctxmethod,
316        tag.Method.Of(method),
317        tag.RPCDirection.Of(tag.Outbound),
318        tag.RPCID.Of(fmt.Sprintf("%q"id)),
319    )
320
321    ac := &AsyncCall{
322        id:      id,
323        ready:   make(chan struct{}),
324        ctx:     ctx,
325        endSpanendSpan,
326    }
327    // When this method returns, either ac is retired, or the request has been
328    // written successfully and the call is awaiting a response (to be provided by
329    // the readIncoming goroutine).
330
331    callerr := NewCall(ac.idmethodparams)
332    if err != nil {
333        ac.retire(&Response{IDidErrorfmt.Errorf("marshaling call parameters: %w"err)})
334        return ac
335    }
336
337    c.updateInFlight(func(s *inFlightState) {
338        err = s.shuttingDown(ErrClientClosing)
339        if err != nil {
340            return
341        }
342        if s.outgoingCalls == nil {
343            s.outgoingCalls = make(map[ID]*AsyncCall)
344        }
345        s.outgoingCalls[ac.id] = ac
346    })
347    if err != nil {
348        ac.retire(&Response{IDidErrorerr})
349        return ac
350    }
351
352    event.Metric(ctxtag.Started.Of(1))
353    if err := c.write(ctxcall); err != nil {
354        // Sending failed. We will never get a response, so deliver a fake one if it
355        // wasn't already retired by the connection breaking.
356        c.updateInFlight(func(s *inFlightState) {
357            if s.outgoingCalls[ac.id] == ac {
358                delete(s.outgoingCallsac.id)
359                ac.retire(&Response{IDidErrorerr})
360            } else {
361                // ac was already retired by the readIncoming goroutine:
362                // perhaps our write raced with the Read side of the connection breaking.
363            }
364        })
365    }
366    return ac
367}
368
369type AsyncCall struct {
370    id       ID
371    ready    chan struct{} // closed after response has been set and span has been ended
372    response *Response
373    ctx      context.Context // for event logging only
374    endSpan  func()          // close the tracing span when all processing for the message is complete
375}
376
377// ID used for this call.
378// This can be used to cancel the call if needed.
379func (ac *AsyncCallID() ID { return ac.id }
380
381// IsReady can be used to check if the result is already prepared.
382// This is guaranteed to return true on a result for which Await has already
383// returned, or a call that failed to send in the first place.
384func (ac *AsyncCallIsReady() bool {
385    select {
386    case <-ac.ready:
387        return true
388    default:
389        return false
390    }
391}
392
393// retire processes the response to the call.
394func (ac *AsyncCallretire(response *Response) {
395    select {
396    case <-ac.ready:
397        panic(fmt.Sprintf("jsonrpc2: retire called twice for ID %v"ac.id))
398    default:
399    }
400
401    ac.response = response
402    labelStatus(ac.ctxresponse.Error)
403    ac.endSpan()
404    // Allow the trace context, which may retain a lot of reachable values,
405    // to be garbage-collected.
406    ac.ctxac.endSpan = nilnil
407
408    close(ac.ready)
409}
410
411// Await waits for (and decodes) the results of a Call.
412// The response will be unmarshaled from JSON into the result.
413func (ac *AsyncCallAwait(ctx context.Contextresult interface{}) error {
414    select {
415    case <-ctx.Done():
416        return ctx.Err()
417    case <-ac.ready:
418    }
419    if ac.response.Error != nil {
420        return ac.response.Error
421    }
422    if result == nil {
423        return nil
424    }
425    return json.Unmarshal(ac.response.Resultresult)
426}
427
428// Respond delivers a response to an incoming Call.
429//
430// Respond must be called exactly once for any message for which a handler
431// returns ErrAsyncResponse. It must not be called for any other message.
432func (c *ConnectionRespond(id IDresult interface{}, err errorerror {
433    var req *incomingRequest
434    c.updateInFlight(func(s *inFlightState) {
435        req = s.incomingByID[id]
436    })
437    if req == nil {
438        return c.internalErrorf("Request not found for ID %v"id)
439    }
440
441    if err == ErrAsyncResponse {
442        // Respond is supposed to supply the asynchronous response, so it would be
443        // confusing to call Respond with an error that promises to call Respond
444        // again.
445        err = c.internalErrorf("Respond called with ErrAsyncResponse for %q"req.Method)
446    }
447    return c.processResult("Respond"reqresulterr)
448}
449
450// Cancel cancels the Context passed to the Handle call for the inbound message
451// with the given ID.
452//
453// Cancel will not complain if the ID is not a currently active message, and it
454// will not cause any messages that have not arrived yet with that ID to be
455// cancelled.
456func (c *ConnectionCancel(id ID) {
457    var req *incomingRequest
458    c.updateInFlight(func(s *inFlightState) {
459        req = s.incomingByID[id]
460    })
461    if req != nil {
462        req.cancel()
463    }
464}
465
466// Wait blocks until the connection is fully closed, but does not close it.
467func (c *ConnectionWait() error {
468    var err error
469    <-c.done
470    c.updateInFlight(func(s *inFlightState) {
471        err = s.closeErr
472    })
473    return err
474}
475
476// Close stops accepting new requests, waits for in-flight requests and enqueued
477// Handle calls to complete, and then closes the underlying stream.
478//
479// After the start of a Close, notification requests (that lack IDs and do not
480// receive responses) will continue to be passed to the Preempter, but calls
481// with IDs will receive immediate responses with ErrServerClosing, and no new
482// requests (not even notifications!) will be enqueued to the Handler.
483func (c *ConnectionClose() error {
484    // Stop handling new requests, and interrupt the reader (by closing the
485    // connection) as soon as the active requests finish.
486    c.updateInFlight(func(s *inFlightState) { s.connClosing = true })
487
488    return c.Wait()
489}
490
491// readIncoming collects inbound messages from the reader and delivers them, either responding
492// to outgoing calls or feeding requests to the queue.
493func (c *ConnectionreadIncoming(ctx context.Contextreader Readerpreempter Preempter) {
494    var err error
495    for {
496        var (
497            msg Message
498            n   int64
499        )
500        msgnerr = reader.Read(ctx)
501        if err != nil {
502            break
503        }
504
505        switch msg := msg.(type) {
506        case *Request:
507            c.acceptRequest(ctxmsgnpreempter)
508
509        case *Response:
510            c.updateInFlight(func(s *inFlightState) {
511                if acok := s.outgoingCalls[msg.ID]; ok {
512                    delete(s.outgoingCallsmsg.ID)
513                    ac.retire(msg)
514                } else {
515                    // TODO: How should we report unexpected responses?
516                }
517            })
518
519        default:
520            c.internalErrorf("Read returned an unexpected message of type %T"msg)
521        }
522    }
523
524    c.updateInFlight(func(s *inFlightState) {
525        s.reading = false
526        s.readErr = err
527
528        // Retire any outgoing requests that were still in flight: with the Reader no
529        // longer being processed, they necessarily cannot receive a response.
530        for idac := range s.outgoingCalls {
531            ac.retire(&Response{IDidErrorerr})
532        }
533        s.outgoingCalls = nil
534    })
535}
536
537// acceptRequest either handles msg synchronously or enqueues it to be handled
538// asynchronously.
539func (c *ConnectionacceptRequest(ctx context.Contextmsg *RequestmsgBytes int64preempter Preempter) {
540    // Add a span to the context for this request.
541    labels := append(make([]label.Label03), // Make space for the ID if present.
542        tag.Method.Of(msg.Method),
543        tag.RPCDirection.Of(tag.Inbound),
544    )
545    if msg.IsCall() {
546        labels = append(labelstag.RPCID.Of(fmt.Sprintf("%q"msg.ID)))
547    }
548    ctxendSpan := event.Start(ctxmsg.Methodlabels...)
549    event.Metric(ctx,
550        tag.Started.Of(1),
551        tag.ReceivedBytes.Of(msgBytes))
552
553    // In theory notifications cannot be cancelled, but we build them a cancel
554    // context anyway.
555    ctxcancel := context.WithCancel(ctx)
556    req := &incomingRequest{
557        Requestmsg,
558        ctx:     ctx,
559        cancel:  cancel,
560        endSpanendSpan,
561    }
562
563    // If the request is a call, add it to the incoming map so it can be
564    // cancelled (or responded) by ID.
565    var err error
566    c.updateInFlight(func(s *inFlightState) {
567        s.incoming++
568
569        if req.IsCall() {
570            if s.incomingByID[req.ID] != nil {
571                err = fmt.Errorf("%w: request ID %v already in use"ErrInvalidRequestreq.ID)
572                req.ID = ID{} // Don't misattribute this error to the existing request.
573                return
574            }
575
576            if s.incomingByID == nil {
577                s.incomingByID = make(map[ID]*incomingRequest)
578            }
579            s.incomingByID[req.ID] = req
580
581            // When shutting down, reject all new Call requests, even if they could
582            // theoretically be handled by the preempter. The preempter could return
583            // ErrAsyncResponse, which would increase the amount of work in flight
584            // when we're trying to ensure that it strictly decreases.
585            err = s.shuttingDown(ErrServerClosing)
586        }
587    })
588    if err != nil {
589        c.processResult("acceptRequest"reqnilerr)
590        return
591    }
592
593    if preempter != nil {
594        resulterr := preempter.Preempt(req.ctxreq.Request)
595
596        if req.IsCall() && errors.Is(errErrAsyncResponse) {
597            // This request will remain in flight until Respond is called for it.
598            return
599        }
600
601        if !errors.Is(errErrNotHandled) {
602            c.processResult("Preempt"reqresulterr)
603            return
604        }
605    }
606
607    c.updateInFlight(func(s *inFlightState) {
608        // If the connection is shutting down, don't enqueue anything to the
609        // handler — not even notifications. That ensures that if the handler
610        // continues to make progress, it will eventually become idle and
611        // close the connection.
612        err = s.shuttingDown(ErrServerClosing)
613        if err != nil {
614            return
615        }
616
617        // We enqueue requests that have not been preempted to an unbounded slice.
618        // Unfortunately, we cannot in general limit the size of the handler
619        // queue: we have to read every response that comes in on the wire
620        // (because it may be responding to a request issued by, say, an
621        // asynchronous handler), and in order to get to that response we have
622        // to read all of the requests that came in ahead of it.
623        s.handlerQueue = append(s.handlerQueuereq)
624        if !s.handlerRunning {
625            // We start the handleAsync goroutine when it has work to do, and let it
626            // exit when the queue empties.
627            //
628            // Otherwise, in order to synchronize the handler we would need some other
629            // goroutine (probably readIncoming?) to explicitly wait for handleAsync
630            // to finish, and that would complicate error reporting: either the error
631            // report from the goroutine would be blocked on the handler emptying its
632            // queue (which was tried, and introduced a deadlock detected by
633            // TestCloseCallRace), or the error would need to be reported separately
634            // from synchronizing completion. Allowing the handler goroutine to exit
635            // when idle seems simpler than trying to implement either of those
636            // alternatives correctly.
637            s.handlerRunning = true
638            go c.handleAsync()
639        }
640    })
641    if err != nil {
642        c.processResult("acceptRequest"reqnilerr)
643    }
644}
645
646// handleAsync invokes the handler on the requests in the handler queue
647// sequentially until the queue is empty.
648func (c *ConnectionhandleAsync() {
649    for {
650        var req *incomingRequest
651        c.updateInFlight(func(s *inFlightState) {
652            if len(s.handlerQueue) > 0 {
653                reqs.handlerQueue = s.handlerQueue[0], s.handlerQueue[1:]
654            } else {
655                s.handlerRunning = false
656            }
657        })
658        if req == nil {
659            return
660        }
661
662        // Only deliver to the Handler if not already canceled.
663        if err := req.ctx.Err(); err != nil {
664            c.updateInFlight(func(s *inFlightState) {
665                if s.writeErr != nil {
666                    // Assume that req.ctx was canceled due to s.writeErr.
667                    // TODO(#51365): use a Context API to plumb this through req.ctx.
668                    err = fmt.Errorf("%w: %v"ErrServerClosings.writeErr)
669                }
670            })
671            c.processResult("handleAsync"reqnilerr)
672            continue
673        }
674
675        resulterr := c.handler.Handle(req.ctxreq.Request)
676        c.processResult(c.handlerreqresulterr)
677    }
678}
679
680// processResult processes the result of a request and, if appropriate, sends a response.
681func (c *ConnectionprocessResult(from interface{}, req *incomingRequestresult interface{}, err errorerror {
682    switch err {
683    case ErrAsyncResponse:
684        if !req.IsCall() {
685            return c.internalErrorf("%#v returned ErrAsyncResponse for a %q Request without an ID"fromreq.Method)
686        }
687        return nil // This request is still in flight, so don't record the result yet.
688    case ErrNotHandledErrMethodNotFound:
689        // Add detail describing the unhandled method.
690        err = fmt.Errorf("%w: %q"ErrMethodNotFoundreq.Method)
691    }
692
693    if req.endSpan == nil {
694        return c.internalErrorf("%#v produced a duplicate %q Response"fromreq.Method)
695    }
696
697    if result != nil && err != nil {
698        c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v"fromreq.Methoderrresult)
699        result = nil // Discard the spurious result and respond with err.
700    }
701
702    if req.IsCall() {
703        if result == nil && err == nil {
704            err = c.internalErrorf("%#v returned a nil result and nil error for a %q Request that requires a Response"fromreq.Method)
705        }
706
707        responserespErr := NewResponse(req.IDresulterr)
708
709        // The caller could theoretically reuse the request's ID as soon as we've
710        // sent the response, so ensure that it is removed from the incoming map
711        // before sending.
712        c.updateInFlight(func(s *inFlightState) {
713            delete(s.incomingByIDreq.ID)
714        })
715        if respErr == nil {
716            writeErr := c.write(notDone{req.ctx}, response)
717            if err == nil {
718                err = writeErr
719            }
720        } else {
721            err = c.internalErrorf("%#v returned a malformed result for %q: %w"fromreq.MethodrespErr)
722        }
723    } else { // req is a notification
724        if result != nil {
725            err = c.internalErrorf("%#v returned a non-nil result for a %q Request without an ID"fromreq.Method)
726        } else if err != nil {
727            err = fmt.Errorf("%w: %q notification failed: %v"ErrInternalreq.Methoderr)
728        }
729        if err != nil {
730            // TODO: can/should we do anything with this error beyond writing it to the event log?
731            // (Is this the right label to attach to the log?)
732            event.Label(req.ctxkeys.Err.Of(err))
733        }
734    }
735
736    labelStatus(req.ctxerr)
737
738    // Cancel the request and finalize the event span to free any associated resources.
739    req.cancel()
740    req.endSpan()
741    req.endSpan = nil
742    c.updateInFlight(func(s *inFlightState) {
743        if s.incoming == 0 {
744            panic("jsonrpc2_v2: processResult called when incoming count is already zero")
745        }
746        s.incoming--
747    })
748    return nil
749}
750
751// write is used by all things that write outgoing messages, including replies.
752// it makes sure that writes are atomic
753func (c *Connectionwrite(ctx context.Contextmsg Messageerror {
754    writer := <-c.writer
755    defer func() { c.writer <- writer }()
756    nerr := writer.Write(ctxmsg)
757    event.Metric(ctxtag.SentBytes.Of(n))
758
759    if err != nil && ctx.Err() == nil {
760        // The call to Write failed, and since ctx.Err() is nil we can't attribute
761        // the failure (even indirectly) to Context cancellation. The writer appears
762        // to be broken, and future writes are likely to also fail.
763        //
764        // If the read side of the connection is also broken, we might not even be
765        // able to receive cancellation notifications. Since we can't reliably write
766        // the results of incoming calls and can't receive explicit cancellations,
767        // cancel the calls now.
768        c.updateInFlight(func(s *inFlightState) {
769            if s.writeErr == nil {
770                s.writeErr = err
771                for _r := range s.incomingByID {
772                    r.cancel()
773                }
774            }
775        })
776    }
777
778    return err
779}
780
781// internalErrorf reports an internal error. By default it panics, but if
782// c.onInternalError is non-nil it instead calls that and returns an error
783// wrapping ErrInternal.
784func (c *ConnectioninternalErrorf(format stringargs ...interface{}) error {
785    err := fmt.Errorf(formatargs...)
786    if c.onInternalError == nil {
787        panic("jsonrpc2: " + err.Error())
788    }
789    c.onInternalError(err)
790
791    return fmt.Errorf("%w: %v"ErrInternalerr)
792}
793
794// labelStatus labels the status of the event in ctx based on whether err is nil.
795func labelStatus(ctx context.Contexterr error) {
796    if err == nil {
797        event.Label(ctxtag.StatusCode.Of("OK"))
798    } else {
799        event.Label(ctxtag.StatusCode.Of("ERROR"))
800    }
801}
802
803// notDone is a context.Context wrapper that returns a nil Done channel.
804type notDone struct{ ctx context.Context }
805
806func (ic notDoneValue(key interface{}) interface{} {
807    return ic.ctx.Value(key)
808}
809
810func (notDoneDone() <-chan struct{}       { return nil }
811func (notDoneErr() error                  { return nil }
812func (notDoneDeadline() (time.Timebool) { return time.Time{}, false }
813
MembersX
BinderFunc
inFlightState.closeErr
Connection.acceptRequest.err
BinderFunc.Bind.ctx
ConnectionOptions.Handler
inFlightState.idle.s
inFlightState.shuttingDown.s
AsyncCall.response
labelStatus
sync
Connection.seq
AsyncCall.ctx
Connection.readIncoming.BlockStmt.RangeStmt_17275.id
Connection.handleAsync.BlockStmt.req
notDone.Value.key
Connection.processResult.BlockStmt.respErr
tag
inFlightState.incoming
newConnection.rwc
AsyncCall.ID
Connection.Cancel.c
Connection.readIncoming.err
Connection.processResult
Connection.processResult.BlockStmt.BlockStmt.writeErr
Connection.write.ctx
newConnection.framer
Connection.Call.method
Connection.readIncoming.reader
Connection.readIncoming.BlockStmt.RangeStmt_17275.ac
Connection.state
incomingRequest.endSpan
AsyncCall.ready
AsyncCall.Await.ctx
Connection.readIncoming.c
Connection.readIncoming
Connection.write.c
label
ConnectionOptions.OnInternalError
Connection.updateInFlight.f
Connection.acceptRequest.preempter
BinderFunc.Bind.f
newConnection
AsyncCall.ID.ac
AsyncCall.Await
Connection.readIncoming.ctx
notDone.ctx
Connection.Notify.done
labelStatus.ctx
Connection.acceptRequest.labels
context
Connection.writer
inFlightState.connClosing
ConnectionOptions.Bind.o
ConnectionOptions.Bind
Connection.Call.params
Connection.Respond.id
Connection.processResult.from
Connection.internalErrorf.c
Connection.internalErrorf.args
Connection.done
Connection.handler
inFlightState.idle
incomingRequest.ctx
Connection.Wait
Connection.write.msg
inFlightState.handlerRunning
incomingRequest.cancel
Connection.Notify.c
Connection.Call.endSpan
Connection.Cancel
Connection.readIncoming.BlockStmt.n
Connection.acceptRequest.ctx
notDone.Value.ic
notDone.Err
atomic
Connection.onInternalError
Connection.updateInFlight.c
Connection.Notify.method
AsyncCall.retire.ac
Connection.processResult.req
inFlightState.outgoingCalls
newConnection.bindCtx
newConnection.ctx
Connection.Notify.params
AsyncCall
Connection.handleAsync.BlockStmt.result
Connection.internalErrorf.err
keys
Connection.Notify.attempted
Connection.Call.ctx
AsyncCall.retire
event
BinderFunc.Bind
BinderFunc.Bind.c
inFlightState.reading
newConnection.reader
Connection.Notify
Connection.stateMu
inFlightState.shuttingDown
Connection.Cancel.req
notDone.Value
inFlightState.closer
AsyncCall.id
Connection.Respond
Connection.Wait.err
Connection.processResult.BlockStmt.response
Connection.updateInFlight
AsyncCall.Await.result
Connection.Call.c
Connection.Call.ac
AsyncCall.IsReady.ac
Connection.Close.c
errors
inFlightState
Connection.processResult.c
Connection.write.err
notDone
incomingRequest
AsyncCall.Await.ac
Connection.write
AsyncCall.endSpan
Connection.Cancel.id
Connection.readIncoming.preempter
Connection.readIncoming.BlockStmt.msg
Connection.acceptRequest.req
io
Connection.Call
Connection.Wait.c
Connection.acceptRequest.msgBytes
inFlightState.outgoingNotifications
newConnection.binder
Connection.Notify.err
Connection.acceptRequest.c
Connection.onDone
inFlightState.shuttingDown.errClosing
newConnection.options
Connection.Respond.result
Connection.acceptRequest.BlockStmt.result
Connection.internalErrorf.format
ConnectionOptions
Connection
Connection.Notify.ctx
Connection.Respond.c
Connection.handleAsync.c
Connection.processResult.result
notDone.Done
newConnection.onDone
Connection.Call.call
Connection.acceptRequest.endSpan
Connection.internalErrorf
json
inFlightState.incomingByID
Connection.acceptRequest
Connection.acceptRequest.BlockStmt.err
Connection.Respond.req
fmt
ConnectionOptions.Framer
inFlightState.handlerQueue
newConnection.c
Connection.Call.id
Connection.Call.err
AsyncCall.IsReady
Connection.Close
Connection.acceptRequest.cancel
Connection.handleAsync.BlockStmt.err
Connection.processResult.err
notDone.Deadline
Binder
Connection.Respond.err
Connection.acceptRequest.msg
Connection.handleAsync
Connection.write.n
Connection.write.BlockStmt.BlockStmt.BlockStmt.RangeStmt_25671.r
labelStatus.err
time
ConnectionOptions.Preempter
inFlightState.readErr
inFlightState.writeErr
Connection.Notify.notify
AsyncCall.retire.response
Members
X