Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.pickerWrapper = newPickerWrapper()

cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)

Expand Down Expand Up @@ -1076,13 +1076,6 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
return cc.sc.healthCheckConfig
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
}

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
if sc == nil {
// should never reach here.
Expand Down
36 changes: 18 additions & 18 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

Expand All @@ -48,14 +47,11 @@ type pickerGeneration struct {
// actions and unblock when there's a picker update.
type pickerWrapper struct {
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
pickerGen atomic.Pointer[pickerGeneration]
statsHandlers []stats.Handler // to record blocking picker calls
pickerGen atomic.Pointer[pickerGeneration]
}

func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
pw := &pickerWrapper{
statsHandlers: statsHandlers,
}
func newPickerWrapper() *pickerWrapper {
pw := &pickerWrapper{}
pw.pickerGen.Store(&pickerGeneration{
blockingCh: make(chan struct{}),
})
Expand Down Expand Up @@ -93,22 +89,29 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
}
}

type pick struct {
transport transport.ClientTransport // the selected transport
result balancer.PickResult // the contents of the pick from the LB policy
blocked bool // set if a picker call queued for a new picker
}

// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
var ch chan struct{}

var lastPickErr error
pickBlocked := false

for {
pg := pw.pickerGen.Load()
if pg == nil {
return nil, balancer.PickResult{}, ErrClientConnClosing
return pick{}, ErrClientConnClosing
}
if pg.picker == nil {
ch = pg.blockingCh
Expand All @@ -127,9 +130,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
return pick{}, status.Error(codes.DeadlineExceeded, errStr)
case context.Canceled:
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
return pick{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
}
Expand All @@ -145,9 +148,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
// In the second case, the only way it will get to this conditional is
// if there is a new picker.
if ch != nil {
for _, sh := range pw.statsHandlers {
sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
pickBlocked = true
}

ch = pg.blockingCh
Expand All @@ -164,15 +165,15 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
}
return nil, balancer.PickResult{}, dropError{error: err}
return pick{}, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
if !failfast {
lastPickErr = err
continue
}
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
return pick{}, status.Error(codes.Unavailable, err.Error())
}

acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
Expand All @@ -183,9 +184,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if t := acbw.ac.getReadyTransport(); t != nil {
if channelz.IsOn() {
doneChannelzWrapper(acbw, &pickResult)
return t, pickResult, nil
}
return t, pickResult, nil
return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
}
if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.
Expand Down
28 changes: 14 additions & 14 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ func (p *testingPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
}

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
if _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
}
}

func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -85,8 +85,8 @@ func (s) TestBlockingPick(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -102,7 +102,7 @@ func (s) TestBlockingPick(t *testing.T) {
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -112,8 +112,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -129,7 +129,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -140,8 +140,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -157,7 +157,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -167,8 +167,8 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions scripts/vet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ NewSubConn is deprecated:
OverrideServerName is deprecated:
RemoveSubConn is deprecated:
SecurityVersion is deprecated:
stats.PickerUpdated is deprecated:
Target is deprecated: Use the Target field in the BuildOptions instead.
UpdateAddresses is deprecated:
UpdateSubConnState is deprecated:
Expand Down
2 changes: 1 addition & 1 deletion stats/opentelemetry/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) {
)
// increment previous rpc attempts applicable for next attempt
atomic.AddUint32(&ai.previousRPCAttempts, 1)
case *stats.PickerUpdated:
case *stats.DelayedPickComplete:
span.AddEvent("Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
Expand Down
20 changes: 13 additions & 7 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,21 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// PickerUpdated indicates that the LB policy provided a new picker while the
// RPC was waiting for one.
type PickerUpdated struct{}
// DelayedPickComplete indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
type DelayedPickComplete struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }
// IsClient indicates DelayedPickComplete is available on the client.
func (*DelayedPickComplete) IsClient() bool { return true }

func (*PickerUpdated) isRPCStats() {}
func (*DelayedPickComplete) isRPCStats() {}

// PickerUpdated indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
//
// Deprecated: will be removed in a future release; use DelayedPickComplete
// instead.
type PickerUpdated = DelayedPickComplete

// InPayload contains stats about an incoming payload.
type InPayload struct {
Expand Down
10 changes: 8 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
func (a *csAttempt) getTransport() error {
cs := a.cs

var err error
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
a.transport, a.pickResult = pick.transport, pick.result
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
Expand All @@ -481,6 +482,11 @@ func (a *csAttempt) getTransport() error {
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
}
if pick.blocked {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also call stats.PickerUpdated to keep the previous behaviour until PickerUpdated is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a type alias so the stats handler will get one event that matches both of the events.

}
}
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6649,13 +6649,13 @@ func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}

var pickerUpdatedCount uint
var delayedPickCompleteCount int
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
if _, ok := stat.(*stats.DelayedPickComplete); ok {
delayedPickCompleteCount++
}
}
if pickerUpdatedCount != 1 {
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
if got, want := delayedPickCompleteCount, 1; got != want {
t.Fatalf("sh.delayedPickComplete count: %v, want: %v", got, want)
}
}
2 changes: 1 addition & 1 deletion test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
if _, ok := s.(*stats.DelayedPickComplete); ok {
return
}
h.mu.Lock()
Expand Down
16 changes: 8 additions & 8 deletions test/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (s) TestPeerForClientStatsHandler(t *testing.T) {
// * Begin stats lack peer info (RPC starts pre-resolution).
// * PickerUpdated: no peer info (picker lacks transport details).
expectedCallouts := map[stats.RPCStats]bool{
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.PickerUpdated{}: false,
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.DelayedPickComplete{}: false,
}

// Start server.
Expand Down
Loading