1 | // Copyright 2018 The Go Authors. All rights reserved. |
---|---|
2 | // Use of this source code is governed by a BSD-style |
3 | // license that can be found in the LICENSE file. |
4 | |
5 | package jsonrpc2 |
6 | |
7 | import ( |
8 | "context" |
9 | "encoding/json" |
10 | "errors" |
11 | "fmt" |
12 | "io" |
13 | "sync" |
14 | "sync/atomic" |
15 | "time" |
16 | |
17 | "golang.org/x/tools/internal/event" |
18 | "golang.org/x/tools/internal/event/keys" |
19 | "golang.org/x/tools/internal/event/label" |
20 | "golang.org/x/tools/internal/event/tag" |
21 | ) |
22 | |
23 | // Binder builds a connection configuration. |
24 | // This may be used in servers to generate a new configuration per connection. |
25 | // ConnectionOptions itself implements Binder returning itself unmodified, to |
26 | // allow for the simple cases where no per connection information is needed. |
27 | type Binder interface { |
28 | // Bind returns the ConnectionOptions to use when establishing the passed-in |
29 | // Connection. |
30 | // |
31 | // The connection is not ready to use when Bind is called, |
32 | // but Bind may close it without reading or writing to it. |
33 | Bind(context.Context, *Connection) ConnectionOptions |
34 | } |
35 | |
36 | // A BinderFunc implements the Binder interface for a standalone Bind function. |
37 | type BinderFunc func(context.Context, *Connection) ConnectionOptions |
38 | |
39 | func (f BinderFunc) Bind(ctx context.Context, c *Connection) ConnectionOptions { |
40 | return f(ctx, c) |
41 | } |
42 | |
43 | var _ Binder = BinderFunc(nil) |
44 | |
45 | // ConnectionOptions holds the options for new connections. |
46 | type ConnectionOptions struct { |
47 | // Framer allows control over the message framing and encoding. |
48 | // If nil, HeaderFramer will be used. |
49 | Framer Framer |
50 | // Preempter allows registration of a pre-queue message handler. |
51 | // If nil, no messages will be preempted. |
52 | Preempter Preempter |
53 | // Handler is used as the queued message handler for inbound messages. |
54 | // If nil, all responses will be ErrNotHandled. |
55 | Handler Handler |
56 | // OnInternalError, if non-nil, is called with any internal errors that occur |
57 | // while serving the connection, such as protocol errors or invariant |
58 | // violations. (If nil, internal errors result in panics.) |
59 | OnInternalError func(error) |
60 | } |
61 | |
62 | // Connection manages the jsonrpc2 protocol, connecting responses back to their |
63 | // calls. |
64 | // Connection is bidirectional; it does not have a designated server or client |
65 | // end. |
66 | type Connection struct { |
67 | seq int64 // must only be accessed using atomic operations |
68 | |
69 | stateMu sync.Mutex |
70 | state inFlightState // accessed only in updateInFlight |
71 | done chan struct{} // closed (under stateMu) when state.closed is true and all goroutines have completed |
72 | |
73 | writer chan Writer // 1-buffered; stores the writer when not in use |
74 | |
75 | handler Handler |
76 | |
77 | onInternalError func(error) |
78 | onDone func() |
79 | } |
80 | |
81 | // inFlightState records the state of the incoming and outgoing calls on a |
82 | // Connection. |
83 | type inFlightState struct { |
84 | connClosing bool // true when the Connection's Close method has been called |
85 | reading bool // true while the readIncoming goroutine is running |
86 | readErr error // non-nil when the readIncoming goroutine exits (typically io.EOF) |
87 | writeErr error // non-nil if a call to the Writer has failed with a non-canceled Context |
88 | |
89 | // closer shuts down and cleans up the Reader and Writer state, ideally |
90 | // interrupting any Read or Write call that is currently blocked. It is closed |
91 | // when the state is idle and one of: connClosing is true, readErr is non-nil, |
92 | // or writeErr is non-nil. |
93 | // |
94 | // After the closer has been invoked, the closer field is set to nil |
95 | // and the closeErr field is simultaneously set to its result. |
96 | closer io.Closer |
97 | closeErr error // error returned from closer.Close |
98 | |
99 | outgoingCalls map[ID]*AsyncCall // calls only |
100 | outgoingNotifications int // # of notifications awaiting "write" |
101 | |
102 | // incoming stores the total number of incoming calls and notifications |
103 | // that have not yet written or processed a result. |
104 | incoming int |
105 | |
106 | incomingByID map[ID]*incomingRequest // calls only |
107 | |
108 | // handlerQueue stores the backlog of calls and notifications that were not |
109 | // already handled by a preempter. |
110 | // The queue does not include the request currently being handled (if any). |
111 | handlerQueue []*incomingRequest |
112 | handlerRunning bool |
113 | } |
114 | |
115 | // updateInFlight locks the state of the connection's in-flight requests, allows |
116 | // f to mutate that state, and closes the connection if it is idle and either |
117 | // is closing or has a read or write error. |
118 | func (c *Connection) updateInFlight(f func(*inFlightState)) { |
119 | c.stateMu.Lock() |
120 | defer c.stateMu.Unlock() |
121 | |
122 | s := &c.state |
123 | |
124 | f(s) |
125 | |
126 | select { |
127 | case <-c.done: |
128 | // The connection was already completely done at the start of this call to |
129 | // updateInFlight, so it must remain so. (The call to f should have noticed |
130 | // that and avoided making any updates that would cause the state to be |
131 | // non-idle.) |
132 | if !s.idle() { |
133 | panic("jsonrpc2_v2: updateInFlight transitioned to non-idle when already done") |
134 | } |
135 | return |
136 | default: |
137 | } |
138 | |
139 | if s.idle() && s.shuttingDown(ErrUnknown) != nil { |
140 | if s.closer != nil { |
141 | s.closeErr = s.closer.Close() |
142 | s.closer = nil // prevent duplicate Close calls |
143 | } |
144 | if s.reading { |
145 | // The readIncoming goroutine is still running. Our call to Close should |
146 | // cause it to exit soon, at which point it will make another call to |
147 | // updateInFlight, set s.reading to false, and mark the Connection done. |
148 | } else { |
149 | // The readIncoming goroutine has exited, or never started to begin with. |
150 | // Since everything else is idle, we're completely done. |
151 | if c.onDone != nil { |
152 | c.onDone() |
153 | } |
154 | close(c.done) |
155 | } |
156 | } |
157 | } |
158 | |
159 | // idle reports whether the connction is in a state with no pending calls or |
160 | // notifications. |
161 | // |
162 | // If idle returns true, the readIncoming goroutine may still be running, |
163 | // but no other goroutines are doing work on behalf of the connnection. |
164 | func (s *inFlightState) idle() bool { |
165 | return len(s.outgoingCalls) == 0 && s.outgoingNotifications == 0 && s.incoming == 0 && !s.handlerRunning |
166 | } |
167 | |
168 | // shuttingDown reports whether the connection is in a state that should |
169 | // disallow new (incoming and outgoing) calls. It returns either nil or |
170 | // an error that is or wraps the provided errClosing. |
171 | func (s *inFlightState) shuttingDown(errClosing error) error { |
172 | if s.connClosing { |
173 | // If Close has been called explicitly, it doesn't matter what state the |
174 | // Reader and Writer are in: we shouldn't be starting new work because the |
175 | // caller told us not to start new work. |
176 | return errClosing |
177 | } |
178 | if s.readErr != nil { |
179 | // If the read side of the connection is broken, we cannot read new call |
180 | // requests, and cannot read responses to our outgoing calls. |
181 | return fmt.Errorf("%w: %v", errClosing, s.readErr) |
182 | } |
183 | if s.writeErr != nil { |
184 | // If the write side of the connection is broken, we cannot write responses |
185 | // for incoming calls, and cannot write requests for outgoing calls. |
186 | return fmt.Errorf("%w: %v", errClosing, s.writeErr) |
187 | } |
188 | return nil |
189 | } |
190 | |
191 | // incomingRequest is used to track an incoming request as it is being handled |
192 | type incomingRequest struct { |
193 | *Request // the request being processed |
194 | ctx context.Context |
195 | cancel context.CancelFunc |
196 | endSpan func() // called (and set to nil) when the response is sent |
197 | } |
198 | |
199 | // Bind returns the options unmodified. |
200 | func (o ConnectionOptions) Bind(context.Context, *Connection) ConnectionOptions { |
201 | return o |
202 | } |
203 | |
204 | // newConnection creates a new connection and runs it. |
205 | // |
206 | // This is used by the Dial and Serve functions to build the actual connection. |
207 | // |
208 | // The connection is closed automatically (and its resources cleaned up) when |
209 | // the last request has completed after the underlying ReadWriteCloser breaks, |
210 | // but it may be stopped earlier by calling Close (for a clean shutdown). |
211 | func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) *Connection { |
212 | // TODO: Should we create a new event span here? |
213 | // This will propagate cancellation from ctx; should it? |
214 | ctx := notDone{bindCtx} |
215 | |
216 | c := &Connection{ |
217 | state: inFlightState{closer: rwc}, |
218 | done: make(chan struct{}), |
219 | writer: make(chan Writer, 1), |
220 | onDone: onDone, |
221 | } |
222 | // It's tempting to set a finalizer on c to verify that the state has gone |
223 | // idle when the connection becomes unreachable. Unfortunately, the Binder |
224 | // interface makes that unsafe: it allows the Handler to close over the |
225 | // Connection, which could create a reference cycle that would cause the |
226 | // Connection to become uncollectable. |
227 | |
228 | options := binder.Bind(bindCtx, c) |
229 | framer := options.Framer |
230 | if framer == nil { |
231 | framer = HeaderFramer() |
232 | } |
233 | c.handler = options.Handler |
234 | if c.handler == nil { |
235 | c.handler = defaultHandler{} |
236 | } |
237 | c.onInternalError = options.OnInternalError |
238 | |
239 | c.writer <- framer.Writer(rwc) |
240 | reader := framer.Reader(rwc) |
241 | |
242 | c.updateInFlight(func(s *inFlightState) { |
243 | select { |
244 | case <-c.done: |
245 | // Bind already closed the connection; don't start a goroutine to read it. |
246 | return |
247 | default: |
248 | } |
249 | |
250 | // The goroutine started here will continue until the underlying stream is closed. |
251 | // |
252 | // (If the Binder closed the Connection already, this should error out and |
253 | // return almost immediately.) |
254 | s.reading = true |
255 | go c.readIncoming(ctx, reader, options.Preempter) |
256 | }) |
257 | return c |
258 | } |
259 | |
260 | // Notify invokes the target method but does not wait for a response. |
261 | // The params will be marshaled to JSON before sending over the wire, and will |
262 | // be handed to the method invoked. |
263 | func (c *Connection) Notify(ctx context.Context, method string, params interface{}) (err error) { |
264 | ctx, done := event.Start(ctx, method, |
265 | tag.Method.Of(method), |
266 | tag.RPCDirection.Of(tag.Outbound), |
267 | ) |
268 | attempted := false |
269 | |
270 | defer func() { |
271 | labelStatus(ctx, err) |
272 | done() |
273 | if attempted { |
274 | c.updateInFlight(func(s *inFlightState) { |
275 | s.outgoingNotifications-- |
276 | }) |
277 | } |
278 | }() |
279 | |
280 | c.updateInFlight(func(s *inFlightState) { |
281 | // If the connection is shutting down, allow outgoing notifications only if |
282 | // there is at least one call still in flight. The number of calls in flight |
283 | // cannot increase once shutdown begins, and allowing outgoing notifications |
284 | // may permit notifications that will cancel in-flight calls. |
285 | if len(s.outgoingCalls) == 0 && len(s.incomingByID) == 0 { |
286 | err = s.shuttingDown(ErrClientClosing) |
287 | if err != nil { |
288 | return |
289 | } |
290 | } |
291 | s.outgoingNotifications++ |
292 | attempted = true |
293 | }) |
294 | if err != nil { |
295 | return err |
296 | } |
297 | |
298 | notify, err := NewNotification(method, params) |
299 | if err != nil { |
300 | return fmt.Errorf("marshaling notify parameters: %v", err) |
301 | } |
302 | |
303 | event.Metric(ctx, tag.Started.Of(1)) |
304 | return c.write(ctx, notify) |
305 | } |
306 | |
307 | // Call invokes the target method and returns an object that can be used to await the response. |
308 | // The params will be marshaled to JSON before sending over the wire, and will |
309 | // be handed to the method invoked. |
310 | // You do not have to wait for the response, it can just be ignored if not needed. |
311 | // If sending the call failed, the response will be ready and have the error in it. |
312 | func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { |
313 | // Generate a new request identifier. |
314 | id := Int64ID(atomic.AddInt64(&c.seq, 1)) |
315 | ctx, endSpan := event.Start(ctx, method, |
316 | tag.Method.Of(method), |
317 | tag.RPCDirection.Of(tag.Outbound), |
318 | tag.RPCID.Of(fmt.Sprintf("%q", id)), |
319 | ) |
320 | |
321 | ac := &AsyncCall{ |
322 | id: id, |
323 | ready: make(chan struct{}), |
324 | ctx: ctx, |
325 | endSpan: endSpan, |
326 | } |
327 | // When this method returns, either ac is retired, or the request has been |
328 | // written successfully and the call is awaiting a response (to be provided by |
329 | // the readIncoming goroutine). |
330 | |
331 | call, err := NewCall(ac.id, method, params) |
332 | if err != nil { |
333 | ac.retire(&Response{ID: id, Error: fmt.Errorf("marshaling call parameters: %w", err)}) |
334 | return ac |
335 | } |
336 | |
337 | c.updateInFlight(func(s *inFlightState) { |
338 | err = s.shuttingDown(ErrClientClosing) |
339 | if err != nil { |
340 | return |
341 | } |
342 | if s.outgoingCalls == nil { |
343 | s.outgoingCalls = make(map[ID]*AsyncCall) |
344 | } |
345 | s.outgoingCalls[ac.id] = ac |
346 | }) |
347 | if err != nil { |
348 | ac.retire(&Response{ID: id, Error: err}) |
349 | return ac |
350 | } |
351 | |
352 | event.Metric(ctx, tag.Started.Of(1)) |
353 | if err := c.write(ctx, call); err != nil { |
354 | // Sending failed. We will never get a response, so deliver a fake one if it |
355 | // wasn't already retired by the connection breaking. |
356 | c.updateInFlight(func(s *inFlightState) { |
357 | if s.outgoingCalls[ac.id] == ac { |
358 | delete(s.outgoingCalls, ac.id) |
359 | ac.retire(&Response{ID: id, Error: err}) |
360 | } else { |
361 | // ac was already retired by the readIncoming goroutine: |
362 | // perhaps our write raced with the Read side of the connection breaking. |
363 | } |
364 | }) |
365 | } |
366 | return ac |
367 | } |
368 | |
369 | type AsyncCall struct { |
370 | id ID |
371 | ready chan struct{} // closed after response has been set and span has been ended |
372 | response *Response |
373 | ctx context.Context // for event logging only |
374 | endSpan func() // close the tracing span when all processing for the message is complete |
375 | } |
376 | |
377 | // ID used for this call. |
378 | // This can be used to cancel the call if needed. |
379 | func (ac *AsyncCall) ID() ID { return ac.id } |
380 | |
381 | // IsReady can be used to check if the result is already prepared. |
382 | // This is guaranteed to return true on a result for which Await has already |
383 | // returned, or a call that failed to send in the first place. |
384 | func (ac *AsyncCall) IsReady() bool { |
385 | select { |
386 | case <-ac.ready: |
387 | return true |
388 | default: |
389 | return false |
390 | } |
391 | } |
392 | |
393 | // retire processes the response to the call. |
394 | func (ac *AsyncCall) retire(response *Response) { |
395 | select { |
396 | case <-ac.ready: |
397 | panic(fmt.Sprintf("jsonrpc2: retire called twice for ID %v", ac.id)) |
398 | default: |
399 | } |
400 | |
401 | ac.response = response |
402 | labelStatus(ac.ctx, response.Error) |
403 | ac.endSpan() |
404 | // Allow the trace context, which may retain a lot of reachable values, |
405 | // to be garbage-collected. |
406 | ac.ctx, ac.endSpan = nil, nil |
407 | |
408 | close(ac.ready) |
409 | } |
410 | |
411 | // Await waits for (and decodes) the results of a Call. |
412 | // The response will be unmarshaled from JSON into the result. |
413 | func (ac *AsyncCall) Await(ctx context.Context, result interface{}) error { |
414 | select { |
415 | case <-ctx.Done(): |
416 | return ctx.Err() |
417 | case <-ac.ready: |
418 | } |
419 | if ac.response.Error != nil { |
420 | return ac.response.Error |
421 | } |
422 | if result == nil { |
423 | return nil |
424 | } |
425 | return json.Unmarshal(ac.response.Result, result) |
426 | } |
427 | |
428 | // Respond delivers a response to an incoming Call. |
429 | // |
430 | // Respond must be called exactly once for any message for which a handler |
431 | // returns ErrAsyncResponse. It must not be called for any other message. |
432 | func (c *Connection) Respond(id ID, result interface{}, err error) error { |
433 | var req *incomingRequest |
434 | c.updateInFlight(func(s *inFlightState) { |
435 | req = s.incomingByID[id] |
436 | }) |
437 | if req == nil { |
438 | return c.internalErrorf("Request not found for ID %v", id) |
439 | } |
440 | |
441 | if err == ErrAsyncResponse { |
442 | // Respond is supposed to supply the asynchronous response, so it would be |
443 | // confusing to call Respond with an error that promises to call Respond |
444 | // again. |
445 | err = c.internalErrorf("Respond called with ErrAsyncResponse for %q", req.Method) |
446 | } |
447 | return c.processResult("Respond", req, result, err) |
448 | } |
449 | |
450 | // Cancel cancels the Context passed to the Handle call for the inbound message |
451 | // with the given ID. |
452 | // |
453 | // Cancel will not complain if the ID is not a currently active message, and it |
454 | // will not cause any messages that have not arrived yet with that ID to be |
455 | // cancelled. |
456 | func (c *Connection) Cancel(id ID) { |
457 | var req *incomingRequest |
458 | c.updateInFlight(func(s *inFlightState) { |
459 | req = s.incomingByID[id] |
460 | }) |
461 | if req != nil { |
462 | req.cancel() |
463 | } |
464 | } |
465 | |
466 | // Wait blocks until the connection is fully closed, but does not close it. |
467 | func (c *Connection) Wait() error { |
468 | var err error |
469 | <-c.done |
470 | c.updateInFlight(func(s *inFlightState) { |
471 | err = s.closeErr |
472 | }) |
473 | return err |
474 | } |
475 | |
476 | // Close stops accepting new requests, waits for in-flight requests and enqueued |
477 | // Handle calls to complete, and then closes the underlying stream. |
478 | // |
479 | // After the start of a Close, notification requests (that lack IDs and do not |
480 | // receive responses) will continue to be passed to the Preempter, but calls |
481 | // with IDs will receive immediate responses with ErrServerClosing, and no new |
482 | // requests (not even notifications!) will be enqueued to the Handler. |
483 | func (c *Connection) Close() error { |
484 | // Stop handling new requests, and interrupt the reader (by closing the |
485 | // connection) as soon as the active requests finish. |
486 | c.updateInFlight(func(s *inFlightState) { s.connClosing = true }) |
487 | |
488 | return c.Wait() |
489 | } |
490 | |
491 | // readIncoming collects inbound messages from the reader and delivers them, either responding |
492 | // to outgoing calls or feeding requests to the queue. |
493 | func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter Preempter) { |
494 | var err error |
495 | for { |
496 | var ( |
497 | msg Message |
498 | n int64 |
499 | ) |
500 | msg, n, err = reader.Read(ctx) |
501 | if err != nil { |
502 | break |
503 | } |
504 | |
505 | switch msg := msg.(type) { |
506 | case *Request: |
507 | c.acceptRequest(ctx, msg, n, preempter) |
508 | |
509 | case *Response: |
510 | c.updateInFlight(func(s *inFlightState) { |
511 | if ac, ok := s.outgoingCalls[msg.ID]; ok { |
512 | delete(s.outgoingCalls, msg.ID) |
513 | ac.retire(msg) |
514 | } else { |
515 | // TODO: How should we report unexpected responses? |
516 | } |
517 | }) |
518 | |
519 | default: |
520 | c.internalErrorf("Read returned an unexpected message of type %T", msg) |
521 | } |
522 | } |
523 | |
524 | c.updateInFlight(func(s *inFlightState) { |
525 | s.reading = false |
526 | s.readErr = err |
527 | |
528 | // Retire any outgoing requests that were still in flight: with the Reader no |
529 | // longer being processed, they necessarily cannot receive a response. |
530 | for id, ac := range s.outgoingCalls { |
531 | ac.retire(&Response{ID: id, Error: err}) |
532 | } |
533 | s.outgoingCalls = nil |
534 | }) |
535 | } |
536 | |
537 | // acceptRequest either handles msg synchronously or enqueues it to be handled |
538 | // asynchronously. |
539 | func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes int64, preempter Preempter) { |
540 | // Add a span to the context for this request. |
541 | labels := append(make([]label.Label, 0, 3), // Make space for the ID if present. |
542 | tag.Method.Of(msg.Method), |
543 | tag.RPCDirection.Of(tag.Inbound), |
544 | ) |
545 | if msg.IsCall() { |
546 | labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID))) |
547 | } |
548 | ctx, endSpan := event.Start(ctx, msg.Method, labels...) |
549 | event.Metric(ctx, |
550 | tag.Started.Of(1), |
551 | tag.ReceivedBytes.Of(msgBytes)) |
552 | |
553 | // In theory notifications cannot be cancelled, but we build them a cancel |
554 | // context anyway. |
555 | ctx, cancel := context.WithCancel(ctx) |
556 | req := &incomingRequest{ |
557 | Request: msg, |
558 | ctx: ctx, |
559 | cancel: cancel, |
560 | endSpan: endSpan, |
561 | } |
562 | |
563 | // If the request is a call, add it to the incoming map so it can be |
564 | // cancelled (or responded) by ID. |
565 | var err error |
566 | c.updateInFlight(func(s *inFlightState) { |
567 | s.incoming++ |
568 | |
569 | if req.IsCall() { |
570 | if s.incomingByID[req.ID] != nil { |
571 | err = fmt.Errorf("%w: request ID %v already in use", ErrInvalidRequest, req.ID) |
572 | req.ID = ID{} // Don't misattribute this error to the existing request. |
573 | return |
574 | } |
575 | |
576 | if s.incomingByID == nil { |
577 | s.incomingByID = make(map[ID]*incomingRequest) |
578 | } |
579 | s.incomingByID[req.ID] = req |
580 | |
581 | // When shutting down, reject all new Call requests, even if they could |
582 | // theoretically be handled by the preempter. The preempter could return |
583 | // ErrAsyncResponse, which would increase the amount of work in flight |
584 | // when we're trying to ensure that it strictly decreases. |
585 | err = s.shuttingDown(ErrServerClosing) |
586 | } |
587 | }) |
588 | if err != nil { |
589 | c.processResult("acceptRequest", req, nil, err) |
590 | return |
591 | } |
592 | |
593 | if preempter != nil { |
594 | result, err := preempter.Preempt(req.ctx, req.Request) |
595 | |
596 | if req.IsCall() && errors.Is(err, ErrAsyncResponse) { |
597 | // This request will remain in flight until Respond is called for it. |
598 | return |
599 | } |
600 | |
601 | if !errors.Is(err, ErrNotHandled) { |
602 | c.processResult("Preempt", req, result, err) |
603 | return |
604 | } |
605 | } |
606 | |
607 | c.updateInFlight(func(s *inFlightState) { |
608 | // If the connection is shutting down, don't enqueue anything to the |
609 | // handler — not even notifications. That ensures that if the handler |
610 | // continues to make progress, it will eventually become idle and |
611 | // close the connection. |
612 | err = s.shuttingDown(ErrServerClosing) |
613 | if err != nil { |
614 | return |
615 | } |
616 | |
617 | // We enqueue requests that have not been preempted to an unbounded slice. |
618 | // Unfortunately, we cannot in general limit the size of the handler |
619 | // queue: we have to read every response that comes in on the wire |
620 | // (because it may be responding to a request issued by, say, an |
621 | // asynchronous handler), and in order to get to that response we have |
622 | // to read all of the requests that came in ahead of it. |
623 | s.handlerQueue = append(s.handlerQueue, req) |
624 | if !s.handlerRunning { |
625 | // We start the handleAsync goroutine when it has work to do, and let it |
626 | // exit when the queue empties. |
627 | // |
628 | // Otherwise, in order to synchronize the handler we would need some other |
629 | // goroutine (probably readIncoming?) to explicitly wait for handleAsync |
630 | // to finish, and that would complicate error reporting: either the error |
631 | // report from the goroutine would be blocked on the handler emptying its |
632 | // queue (which was tried, and introduced a deadlock detected by |
633 | // TestCloseCallRace), or the error would need to be reported separately |
634 | // from synchronizing completion. Allowing the handler goroutine to exit |
635 | // when idle seems simpler than trying to implement either of those |
636 | // alternatives correctly. |
637 | s.handlerRunning = true |
638 | go c.handleAsync() |
639 | } |
640 | }) |
641 | if err != nil { |
642 | c.processResult("acceptRequest", req, nil, err) |
643 | } |
644 | } |
645 | |
646 | // handleAsync invokes the handler on the requests in the handler queue |
647 | // sequentially until the queue is empty. |
648 | func (c *Connection) handleAsync() { |
649 | for { |
650 | var req *incomingRequest |
651 | c.updateInFlight(func(s *inFlightState) { |
652 | if len(s.handlerQueue) > 0 { |
653 | req, s.handlerQueue = s.handlerQueue[0], s.handlerQueue[1:] |
654 | } else { |
655 | s.handlerRunning = false |
656 | } |
657 | }) |
658 | if req == nil { |
659 | return |
660 | } |
661 | |
662 | // Only deliver to the Handler if not already canceled. |
663 | if err := req.ctx.Err(); err != nil { |
664 | c.updateInFlight(func(s *inFlightState) { |
665 | if s.writeErr != nil { |
666 | // Assume that req.ctx was canceled due to s.writeErr. |
667 | // TODO(#51365): use a Context API to plumb this through req.ctx. |
668 | err = fmt.Errorf("%w: %v", ErrServerClosing, s.writeErr) |
669 | } |
670 | }) |
671 | c.processResult("handleAsync", req, nil, err) |
672 | continue |
673 | } |
674 | |
675 | result, err := c.handler.Handle(req.ctx, req.Request) |
676 | c.processResult(c.handler, req, result, err) |
677 | } |
678 | } |
679 | |
680 | // processResult processes the result of a request and, if appropriate, sends a response. |
681 | func (c *Connection) processResult(from interface{}, req *incomingRequest, result interface{}, err error) error { |
682 | switch err { |
683 | case ErrAsyncResponse: |
684 | if !req.IsCall() { |
685 | return c.internalErrorf("%#v returned ErrAsyncResponse for a %q Request without an ID", from, req.Method) |
686 | } |
687 | return nil // This request is still in flight, so don't record the result yet. |
688 | case ErrNotHandled, ErrMethodNotFound: |
689 | // Add detail describing the unhandled method. |
690 | err = fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method) |
691 | } |
692 | |
693 | if req.endSpan == nil { |
694 | return c.internalErrorf("%#v produced a duplicate %q Response", from, req.Method) |
695 | } |
696 | |
697 | if result != nil && err != nil { |
698 | c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v", from, req.Method, err, result) |
699 | result = nil // Discard the spurious result and respond with err. |
700 | } |
701 | |
702 | if req.IsCall() { |
703 | if result == nil && err == nil { |
704 | err = c.internalErrorf("%#v returned a nil result and nil error for a %q Request that requires a Response", from, req.Method) |
705 | } |
706 | |
707 | response, respErr := NewResponse(req.ID, result, err) |
708 | |
709 | // The caller could theoretically reuse the request's ID as soon as we've |
710 | // sent the response, so ensure that it is removed from the incoming map |
711 | // before sending. |
712 | c.updateInFlight(func(s *inFlightState) { |
713 | delete(s.incomingByID, req.ID) |
714 | }) |
715 | if respErr == nil { |
716 | writeErr := c.write(notDone{req.ctx}, response) |
717 | if err == nil { |
718 | err = writeErr |
719 | } |
720 | } else { |
721 | err = c.internalErrorf("%#v returned a malformed result for %q: %w", from, req.Method, respErr) |
722 | } |
723 | } else { // req is a notification |
724 | if result != nil { |
725 | err = c.internalErrorf("%#v returned a non-nil result for a %q Request without an ID", from, req.Method) |
726 | } else if err != nil { |
727 | err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, req.Method, err) |
728 | } |
729 | if err != nil { |
730 | // TODO: can/should we do anything with this error beyond writing it to the event log? |
731 | // (Is this the right label to attach to the log?) |
732 | event.Label(req.ctx, keys.Err.Of(err)) |
733 | } |
734 | } |
735 | |
736 | labelStatus(req.ctx, err) |
737 | |
738 | // Cancel the request and finalize the event span to free any associated resources. |
739 | req.cancel() |
740 | req.endSpan() |
741 | req.endSpan = nil |
742 | c.updateInFlight(func(s *inFlightState) { |
743 | if s.incoming == 0 { |
744 | panic("jsonrpc2_v2: processResult called when incoming count is already zero") |
745 | } |
746 | s.incoming-- |
747 | }) |
748 | return nil |
749 | } |
750 | |
751 | // write is used by all things that write outgoing messages, including replies. |
752 | // it makes sure that writes are atomic |
753 | func (c *Connection) write(ctx context.Context, msg Message) error { |
754 | writer := <-c.writer |
755 | defer func() { c.writer <- writer }() |
756 | n, err := writer.Write(ctx, msg) |
757 | event.Metric(ctx, tag.SentBytes.Of(n)) |
758 | |
759 | if err != nil && ctx.Err() == nil { |
760 | // The call to Write failed, and since ctx.Err() is nil we can't attribute |
761 | // the failure (even indirectly) to Context cancellation. The writer appears |
762 | // to be broken, and future writes are likely to also fail. |
763 | // |
764 | // If the read side of the connection is also broken, we might not even be |
765 | // able to receive cancellation notifications. Since we can't reliably write |
766 | // the results of incoming calls and can't receive explicit cancellations, |
767 | // cancel the calls now. |
768 | c.updateInFlight(func(s *inFlightState) { |
769 | if s.writeErr == nil { |
770 | s.writeErr = err |
771 | for _, r := range s.incomingByID { |
772 | r.cancel() |
773 | } |
774 | } |
775 | }) |
776 | } |
777 | |
778 | return err |
779 | } |
780 | |
781 | // internalErrorf reports an internal error. By default it panics, but if |
782 | // c.onInternalError is non-nil it instead calls that and returns an error |
783 | // wrapping ErrInternal. |
784 | func (c *Connection) internalErrorf(format string, args ...interface{}) error { |
785 | err := fmt.Errorf(format, args...) |
786 | if c.onInternalError == nil { |
787 | panic("jsonrpc2: " + err.Error()) |
788 | } |
789 | c.onInternalError(err) |
790 | |
791 | return fmt.Errorf("%w: %v", ErrInternal, err) |
792 | } |
793 | |
794 | // labelStatus labels the status of the event in ctx based on whether err is nil. |
795 | func labelStatus(ctx context.Context, err error) { |
796 | if err == nil { |
797 | event.Label(ctx, tag.StatusCode.Of("OK")) |
798 | } else { |
799 | event.Label(ctx, tag.StatusCode.Of("ERROR")) |
800 | } |
801 | } |
802 | |
803 | // notDone is a context.Context wrapper that returns a nil Done channel. |
804 | type notDone struct{ ctx context.Context } |
805 | |
806 | func (ic notDone) Value(key interface{}) interface{} { |
807 | return ic.ctx.Value(key) |
808 | } |
809 | |
810 | func (notDone) Done() <-chan struct{} { return nil } |
811 | func (notDone) Err() error { return nil } |
812 | func (notDone) Deadline() (time.Time, bool) { return time.Time{}, false } |
813 |
Members