2018-05-01 23:45:06 +00:00
|
|
|
// Copyright 2015 The Go Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
// Transport code's client connection pooling.
|
|
|
|
|
|
|
|
package http2
|
|
|
|
|
|
|
|
import (
|
2021-11-10 17:20:10 +00:00
|
|
|
"context"
|
2018-05-01 23:45:06 +00:00
|
|
|
"crypto/tls"
|
2021-11-10 17:20:10 +00:00
|
|
|
"errors"
|
2018-05-01 23:45:06 +00:00
|
|
|
"net/http"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ClientConnPool manages a pool of HTTP/2 client connections.
|
|
|
|
type ClientConnPool interface {
|
2021-11-10 17:20:10 +00:00
|
|
|
// GetClientConn returns a specific HTTP/2 connection (usually
|
|
|
|
// a TLS-TCP connection) to an HTTP/2 server. On success, the
|
|
|
|
// returned ClientConn accounts for the upcoming RoundTrip
|
|
|
|
// call, so the caller should not omit it. If the caller needs
|
|
|
|
// to, ClientConn.RoundTrip can be called with a bogus
|
|
|
|
// new(http.Request) to release the stream reservation.
|
2018-05-01 23:45:06 +00:00
|
|
|
GetClientConn(req *http.Request, addr string) (*ClientConn, error)
|
|
|
|
MarkDead(*ClientConn)
|
|
|
|
}
|
|
|
|
|
|
|
|
// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
|
|
|
|
// implementations which can close their idle connections.
|
|
|
|
type clientConnPoolIdleCloser interface {
|
|
|
|
ClientConnPool
|
|
|
|
closeIdleConnections()
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
|
|
|
|
_ clientConnPoolIdleCloser = noDialClientConnPool{}
|
|
|
|
)
|
|
|
|
|
|
|
|
// TODO: use singleflight for dialing and addConnCalls?
|
|
|
|
type clientConnPool struct {
|
|
|
|
t *Transport
|
|
|
|
|
|
|
|
mu sync.Mutex // TODO: maybe switch to RWMutex
|
|
|
|
// TODO: add support for sharing conns based on cert names
|
|
|
|
// (e.g. share conn for googleapis.com and appspot.com)
|
|
|
|
conns map[string][]*ClientConn // key is host:port
|
|
|
|
dialing map[string]*dialCall // currently in-flight dials
|
|
|
|
keys map[*ClientConn][]string
|
2021-11-10 17:20:10 +00:00
|
|
|
addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
|
2018-05-01 23:45:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
|
|
return p.getClientConn(req, addr, dialOnMiss)
|
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
|
|
|
dialOnMiss = true
|
|
|
|
noDialOnMiss = false
|
|
|
|
)
|
|
|
|
|
|
|
|
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
|
2021-11-10 17:20:10 +00:00
|
|
|
// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
|
2018-05-01 23:45:06 +00:00
|
|
|
if isConnectionCloseRequest(req) && dialOnMiss {
|
|
|
|
// It gets its own connection.
|
2019-04-17 17:15:55 +00:00
|
|
|
traceGetConn(req, addr)
|
2018-05-01 23:45:06 +00:00
|
|
|
const singleUse = true
|
2021-11-10 17:20:10 +00:00
|
|
|
cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
|
2018-05-01 23:45:06 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return cc, nil
|
|
|
|
}
|
2021-11-10 17:20:10 +00:00
|
|
|
for {
|
|
|
|
p.mu.Lock()
|
|
|
|
for _, cc := range p.conns[addr] {
|
|
|
|
if cc.ReserveNewRequest() {
|
|
|
|
// When a connection is presented to us by the net/http package,
|
|
|
|
// the GetConn hook has already been called.
|
|
|
|
// Don't call it a second time here.
|
|
|
|
if !cc.getConnCalled {
|
|
|
|
traceGetConn(req, addr)
|
|
|
|
}
|
|
|
|
cc.getConnCalled = false
|
|
|
|
p.mu.Unlock()
|
|
|
|
return cc, nil
|
2019-04-17 17:15:55 +00:00
|
|
|
}
|
2021-11-10 17:20:10 +00:00
|
|
|
}
|
|
|
|
if !dialOnMiss {
|
2018-05-01 23:45:06 +00:00
|
|
|
p.mu.Unlock()
|
2021-11-10 17:20:10 +00:00
|
|
|
return nil, ErrNoCachedConn
|
2018-05-01 23:45:06 +00:00
|
|
|
}
|
2021-11-10 17:20:10 +00:00
|
|
|
traceGetConn(req, addr)
|
|
|
|
call := p.getStartDialLocked(req.Context(), addr)
|
2018-05-01 23:45:06 +00:00
|
|
|
p.mu.Unlock()
|
2021-11-10 17:20:10 +00:00
|
|
|
<-call.done
|
|
|
|
if shouldRetryDial(call, req) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
cc, err := call.res, call.err
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if cc.ReserveNewRequest() {
|
|
|
|
return cc, nil
|
|
|
|
}
|
2018-05-01 23:45:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// dialCall is an in-flight Transport dial call to a host.
|
|
|
|
type dialCall struct {
|
2021-11-10 17:20:10 +00:00
|
|
|
_ incomparable
|
|
|
|
p *clientConnPool
|
|
|
|
// the context associated with the request
|
|
|
|
// that created this dialCall
|
|
|
|
ctx context.Context
|
2018-05-01 23:45:06 +00:00
|
|
|
done chan struct{} // closed when done
|
|
|
|
res *ClientConn // valid after done is closed
|
|
|
|
err error // valid after done is closed
|
|
|
|
}
|
|
|
|
|
|
|
|
// requires p.mu is held.
|
2021-11-10 17:20:10 +00:00
|
|
|
func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
|
2018-05-01 23:45:06 +00:00
|
|
|
if call, ok := p.dialing[addr]; ok {
|
|
|
|
// A dial is already in-flight. Don't start another.
|
|
|
|
return call
|
|
|
|
}
|
2021-11-10 17:20:10 +00:00
|
|
|
call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
|
2018-05-01 23:45:06 +00:00
|
|
|
if p.dialing == nil {
|
|
|
|
p.dialing = make(map[string]*dialCall)
|
|
|
|
}
|
|
|
|
p.dialing[addr] = call
|
2021-11-10 17:20:10 +00:00
|
|
|
go call.dial(call.ctx, addr)
|
2018-05-01 23:45:06 +00:00
|
|
|
return call
|
|
|
|
}
|
|
|
|
|
|
|
|
// run in its own goroutine.
|
2021-11-10 17:20:10 +00:00
|
|
|
func (c *dialCall) dial(ctx context.Context, addr string) {
|
2018-05-01 23:45:06 +00:00
|
|
|
const singleUse = false // shared conn
|
2021-11-10 17:20:10 +00:00
|
|
|
c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
|
2018-05-01 23:45:06 +00:00
|
|
|
|
|
|
|
c.p.mu.Lock()
|
|
|
|
delete(c.p.dialing, addr)
|
|
|
|
if c.err == nil {
|
|
|
|
c.p.addConnLocked(addr, c.res)
|
|
|
|
}
|
|
|
|
c.p.mu.Unlock()
|
2022-08-08 14:49:10 +00:00
|
|
|
|
|
|
|
close(c.done)
|
2018-05-01 23:45:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
|
|
|
|
// already exist. It coalesces concurrent calls with the same key.
|
|
|
|
// This is used by the http1 Transport code when it creates a new connection. Because
|
|
|
|
// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
|
|
|
|
// the protocol), it can get into a situation where it has multiple TLS connections.
|
|
|
|
// This code decides which ones live or die.
|
|
|
|
// The return value used is whether c was used.
|
|
|
|
// c is never closed.
|
|
|
|
func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
|
|
|
|
p.mu.Lock()
|
|
|
|
for _, cc := range p.conns[key] {
|
|
|
|
if cc.CanTakeNewRequest() {
|
|
|
|
p.mu.Unlock()
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
call, dup := p.addConnCalls[key]
|
|
|
|
if !dup {
|
|
|
|
if p.addConnCalls == nil {
|
|
|
|
p.addConnCalls = make(map[string]*addConnCall)
|
|
|
|
}
|
|
|
|
call = &addConnCall{
|
|
|
|
p: p,
|
|
|
|
done: make(chan struct{}),
|
|
|
|
}
|
|
|
|
p.addConnCalls[key] = call
|
|
|
|
go call.run(t, key, c)
|
|
|
|
}
|
|
|
|
p.mu.Unlock()
|
|
|
|
|
|
|
|
<-call.done
|
|
|
|
if call.err != nil {
|
|
|
|
return false, call.err
|
|
|
|
}
|
|
|
|
return !dup, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type addConnCall struct {
|
2020-07-06 16:22:27 +00:00
|
|
|
_ incomparable
|
2018-05-01 23:45:06 +00:00
|
|
|
p *clientConnPool
|
|
|
|
done chan struct{} // closed when done
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
|
|
|
|
cc, err := t.NewClientConn(tc)
|
|
|
|
|
|
|
|
p := c.p
|
|
|
|
p.mu.Lock()
|
|
|
|
if err != nil {
|
|
|
|
c.err = err
|
|
|
|
} else {
|
2021-11-10 17:20:10 +00:00
|
|
|
cc.getConnCalled = true // already called by the net/http package
|
2018-05-01 23:45:06 +00:00
|
|
|
p.addConnLocked(key, cc)
|
|
|
|
}
|
|
|
|
delete(p.addConnCalls, key)
|
|
|
|
p.mu.Unlock()
|
|
|
|
close(c.done)
|
|
|
|
}
|
|
|
|
|
|
|
|
// p.mu must be held
|
|
|
|
func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
|
|
|
|
for _, v := range p.conns[key] {
|
|
|
|
if v == cc {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if p.conns == nil {
|
|
|
|
p.conns = make(map[string][]*ClientConn)
|
|
|
|
}
|
|
|
|
if p.keys == nil {
|
|
|
|
p.keys = make(map[*ClientConn][]string)
|
|
|
|
}
|
|
|
|
p.conns[key] = append(p.conns[key], cc)
|
|
|
|
p.keys[cc] = append(p.keys[cc], key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *clientConnPool) MarkDead(cc *ClientConn) {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
for _, key := range p.keys[cc] {
|
|
|
|
vv, ok := p.conns[key]
|
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newList := filterOutClientConn(vv, cc)
|
|
|
|
if len(newList) > 0 {
|
|
|
|
p.conns[key] = newList
|
|
|
|
} else {
|
|
|
|
delete(p.conns, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
delete(p.keys, cc)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *clientConnPool) closeIdleConnections() {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
// TODO: don't close a cc if it was just added to the pool
|
|
|
|
// milliseconds ago and has never been used. There's currently
|
|
|
|
// a small race window with the HTTP/1 Transport's integration
|
|
|
|
// where it can add an idle conn just before using it, and
|
|
|
|
// somebody else can concurrently call CloseIdleConns and
|
|
|
|
// break some caller's RoundTrip.
|
|
|
|
for _, vv := range p.conns {
|
|
|
|
for _, cc := range vv {
|
|
|
|
cc.closeIfIdle()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
|
|
|
|
out := in[:0]
|
|
|
|
for _, v := range in {
|
|
|
|
if v != exclude {
|
|
|
|
out = append(out, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// If we filtered it out, zero out the last item to prevent
|
|
|
|
// the GC from seeing it.
|
|
|
|
if len(in) != len(out) {
|
|
|
|
in[len(in)-1] = nil
|
|
|
|
}
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
|
|
|
// noDialClientConnPool is an implementation of http2.ClientConnPool
|
|
|
|
// which never dials. We let the HTTP/1.1 client dial and use its TLS
|
|
|
|
// connection instead.
|
|
|
|
type noDialClientConnPool struct{ *clientConnPool }
|
|
|
|
|
|
|
|
func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
|
|
|
|
return p.getClientConn(req, addr, noDialOnMiss)
|
|
|
|
}
|
2021-11-10 17:20:10 +00:00
|
|
|
|
|
|
|
// shouldRetryDial reports whether the current request should
|
|
|
|
// retry dialing after the call finished unsuccessfully, for example
|
|
|
|
// if the dial was canceled because of a context cancellation or
|
|
|
|
// deadline expiry.
|
|
|
|
func shouldRetryDial(call *dialCall, req *http.Request) bool {
|
|
|
|
if call.err == nil {
|
|
|
|
// No error, no need to retry
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if call.ctx == req.Context() {
|
|
|
|
// If the call has the same context as the request, the dial
|
|
|
|
// should not be retried, since any cancellation will have come
|
|
|
|
// from this request.
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
|
|
|
|
// If the call error is not because of a context cancellation or a deadline expiry,
|
|
|
|
// the dial should not be retried.
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// Only retry if the error is a context cancellation error or deadline expiry
|
|
|
|
// and the context associated with the call was canceled or expired.
|
|
|
|
return call.ctx.Err() != nil
|
|
|
|
}
|