-
Notifications
You must be signed in to change notification settings - Fork 338
mcp: improve http transports error handling and make transport work with any size message #734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -382,7 +382,7 @@ func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error) { | |
| s := &sseClientConn{ | ||
| client: httpClient, | ||
| msgEndpoint: msgEndpoint, | ||
| incoming: make(chan []byte, 100), | ||
| incoming: make(chan sseMessage, 100), | ||
| body: resp.Body, | ||
| done: make(chan struct{}), | ||
| } | ||
|
|
@@ -392,10 +392,14 @@ func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error) { | |
|
|
||
| for evt, err := range scanEvents(resp.Body) { | ||
| if err != nil { | ||
| select { | ||
| case s.incoming <- sseMessage{err: err}: | ||
| case <-s.done: | ||
| } | ||
| return | ||
| } | ||
| select { | ||
| case s.incoming <- evt.Data: | ||
| case s.incoming <- sseMessage{data: evt.Data}: | ||
| case <-s.done: | ||
| return | ||
| } | ||
|
|
@@ -405,15 +409,21 @@ func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error) { | |
| return s, nil | ||
| } | ||
|
|
||
| // sseMessage represents a message or error from the SSE stream. | ||
| type sseMessage struct { | ||
| data []byte | ||
| err error | ||
| } | ||
|
|
||
| // An sseClientConn is a logical jsonrpc2 connection that implements the client | ||
| // half of the SSE protocol: | ||
| // - Writes are POSTS to the session endpoint. | ||
| // - Reads are SSE 'message' events, and pushes them onto a buffered channel. | ||
| // - Close terminates the GET request. | ||
| type sseClientConn struct { | ||
| client *http.Client // HTTP client to use for requests | ||
| msgEndpoint *url.URL // session endpoint for POSTs | ||
| incoming chan []byte // queue of incoming messages | ||
| client *http.Client // HTTP client to use for requests | ||
| msgEndpoint *url.URL // session endpoint for POSTs | ||
| incoming chan sseMessage // queue of incoming messages or errors | ||
|
|
||
| mu sync.Mutex | ||
| body io.ReadCloser // body of the hanging GET | ||
|
|
@@ -438,12 +448,16 @@ func (c *sseClientConn) Read(ctx context.Context) (jsonrpc.Message, error) { | |
| case <-c.done: | ||
| return nil, io.EOF | ||
|
|
||
| case data := <-c.incoming: | ||
| case m := <-c.incoming: | ||
| if m.err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning an error from Read breaks the connection. I don't think that's actually what we want to happen here, is it?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that this method return either an error or a jsonrpc.Message, I think this is an acceptable behaviour.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't think I sufficiently conveyed the consequences. So as a consequence of this change, a context cancellation calling a tool will break the entire MCP session. I think I know the desired behavior: you want to get an error from the The jsonrpc2 library doesn't really support this: we'd need to add something like If you'd like to land this PR, I suggest not returning an error here, and I can make the change to achieve what you want.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this error different from the error returned a few lines bellow by Your assumption is correct, I want to bubble up any errors from lower levels, so that for the caller the problem would be clear enough, but this implementation doesn't seem to be using a jsonrpc2 connection, just the encode/decode part. (this is sse implementation, not jsonrpc).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The jsonrpc2.Connection calls Read, which must have stream semantics: an error from Read breaks the stream. I think there's an argument to be made that a malformed payload should break the stream: if the server sends something that we can't even parse as a jsonrpc2.Message, then the stream is corrupt. But if the error is due to a network error or client cancellation, we don't want to terminate the session. See also #683. Let's focus on fixing the size limit. We can leave the bubbling up of errors, but should not return an error here. I'll make the change to surface the error to the application layer:it's a subtle change to the jsonrpc2 connection. |
||
| // TODO: bubble up this error | ||
| return nil, nil | ||
| } | ||
| // TODO(rfindley): do we really need to check this? We receive from c.done above. | ||
| if c.isDone() { | ||
| return nil, io.EOF | ||
| } | ||
| msg, err := jsonrpc2.DecodeMessage(data) | ||
| msg, err := jsonrpc2.DecodeMessage(m.data) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.