Skip to content

Commit 557e11f

Browse files
authored
Better lifecycle management in the shard executor library (#7095)
What changed? Better shutdown of the executor process loop Made the updateShardAssignment function use waitgroups, so it blocks until all the processors has started or stopped Made sure we do not stop heartbeating if an assignment takes a long time. If we are still running an old assignment when we get a new one, we will skip the new one. Protected the state of the managed processor with an atomic int Why? Make sure the life cycle management of the executor library is robust How did you test it? Unit tests Potential risks Release notes Documentation Changes
1 parent 8587c1f commit 557e11f

File tree

2 files changed

+61
-28
lines changed

2 files changed

+61
-28
lines changed

service/sharddistributor/executorclient/clientimpl.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/uber/cadence/client/sharddistributorexecutor"
@@ -14,7 +15,7 @@ import (
1415
"github.com/uber/cadence/service/sharddistributor/executorclient/syncgeneric"
1516
)
1617

17-
type processorState int
18+
type processorState int32
1819

1920
const (
2021
processorStateStarting processorState = iota
@@ -24,7 +25,25 @@ const (
2425

2526
type managedProcessor[SP ShardProcessor] struct {
2627
processor SP
27-
state processorState
28+
state atomic.Int32
29+
}
30+
31+
func (mp *managedProcessor[SP]) setState(state processorState) {
32+
mp.state.Store(int32(state))
33+
}
34+
35+
func (mp *managedProcessor[SP]) getState() processorState {
36+
return processorState(mp.state.Load())
37+
}
38+
39+
func newManagedProcessor[SP ShardProcessor](processor SP, state processorState) *managedProcessor[SP] {
40+
managed := &managedProcessor[SP]{
41+
processor: processor,
42+
state: atomic.Int32{},
43+
}
44+
45+
managed.setState(state)
46+
return managed
2847
}
2948

3049
type executorImpl[SP ShardProcessor] struct {
@@ -37,14 +56,21 @@ type executorImpl[SP ShardProcessor] struct {
3756
managedProcessors syncgeneric.Map[string, *managedProcessor[SP]]
3857
executorID string
3958
timeSource clock.TimeSource
59+
processLoopWG sync.WaitGroup
60+
assignmentMutex sync.Mutex
4061
}
4162

4263
func (e *executorImpl[SP]) Start(ctx context.Context) {
43-
go e.heartbeatloop(ctx)
64+
e.processLoopWG.Add(1)
65+
go func() {
66+
defer e.processLoopWG.Done()
67+
e.heartbeatloop(ctx)
68+
}()
4469
}
4570

4671
func (e *executorImpl[SP]) Stop() {
4772
close(e.stopC)
73+
e.processLoopWG.Wait()
4874
}
4975

5076
func (e *executorImpl[SP]) GetShardProcess(shardID string) (SP, error) {
@@ -63,6 +89,9 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
6389
for {
6490
select {
6591
case <-ctx.Done():
92+
e.logger.Info("shard distributorexecutor context done, stopping")
93+
e.stopShardProcessors()
94+
return
6695
case <-e.stopC:
6796
e.logger.Info("shard distributorexecutor stopped")
6897
e.stopShardProcessors()
@@ -73,7 +102,14 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
73102
e.logger.Error("failed to heartbeat", tag.Error(err))
74103
continue // TODO: should we stop the executor, and drop all the shards?
75104
}
76-
e.updateShardAssignment(ctx, shardAssignment)
105+
if !e.assignmentMutex.TryLock() {
106+
e.logger.Warn("already doing shard assignment, will skip this assignment")
107+
continue
108+
}
109+
go func() {
110+
defer e.assignmentMutex.Unlock()
111+
e.updateShardAssignment(ctx, shardAssignment)
112+
}()
77113
}
78114
}
79115
}
@@ -82,7 +118,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
82118
// Fill in the shard status reports
83119
shardStatusReports := make(map[string]*types.ShardStatusReport)
84120
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
85-
if managedProcessor.state == processorStateStarted {
121+
if managedProcessor.getState() == processorStateStarted {
86122
shardStatusReports[shardID] = &types.ShardStatusReport{
87123
ShardLoad: managedProcessor.processor.GetShardLoad(),
88124
Status: types.ShardStatusREADY,
@@ -109,11 +145,15 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
109145
}
110146

111147
func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) {
148+
wg := sync.WaitGroup{}
149+
112150
// Stop shard processing for shards not assigned to this executor
113151
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
114152
if assignment, ok := shardAssignments[shardID]; !ok || assignment.Status != types.AssignmentStatusREADY {
153+
wg.Add(1)
115154
go func() {
116-
managedProcessor.state = processorStateStopping
155+
defer wg.Done()
156+
managedProcessor.setState(processorStateStopping)
117157
managedProcessor.processor.Stop()
118158
e.managedProcessors.Delete(shardID)
119159
}()
@@ -125,37 +165,42 @@ func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssig
125165
for shardID, assignment := range shardAssignments {
126166
if assignment.Status == types.AssignmentStatusREADY {
127167
if _, ok := e.managedProcessors.Load(shardID); !ok {
168+
wg.Add(1)
128169
go func() {
170+
defer wg.Done()
129171
processor, err := e.shardProcessorFactory.NewShardProcessor(shardID)
130172
if err != nil {
131173
e.logger.Error("failed to create shard processor", tag.Error(err))
132174
return
133175
}
176+
managedProcessor := newManagedProcessor(processor, processorStateStarting)
177+
e.managedProcessors.Store(shardID, managedProcessor)
178+
134179
processor.Start(ctx)
135-
e.managedProcessors.Store(shardID, &managedProcessor[SP]{
136-
processor: processor,
137-
state: processorStateStarted,
138-
})
180+
181+
managedProcessor.setState(processorStateStarted)
139182
}()
140183
}
141184
}
142185
}
186+
187+
wg.Wait()
143188
}
144189

145190
func (e *executorImpl[SP]) stopShardProcessors() {
146191
wg := sync.WaitGroup{}
147192

148193
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
149194
// If the processor is already stopping, skip it
150-
if managedProcessor.state == processorStateStopping {
195+
if managedProcessor.getState() == processorStateStopping {
151196
return true
152197
}
153198

154199
wg.Add(1)
155200
go func() {
156201
defer wg.Done()
157202

158-
managedProcessor.state = processorStateStopping
203+
managedProcessor.setState(processorStateStopping)
159204
managedProcessor.processor.Stop()
160205
e.managedProcessors.Delete(shardID)
161206
}()

service/sharddistributor/executorclient/clientimpl_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,8 @@ func TestHeartbeat(t *testing.T) {
129129
executorID: "test-executor-id",
130130
}
131131

132-
executor.managedProcessors.Store("test-shard-id1", &managedProcessor[*MockShardProcessor]{
133-
processor: shardProcessorMock1,
134-
state: processorStateStarted,
135-
})
136-
executor.managedProcessors.Store("test-shard-id2", &managedProcessor[*MockShardProcessor]{
137-
processor: shardProcessorMock2,
138-
state: processorStateStarted,
139-
})
132+
executor.managedProcessors.Store("test-shard-id1", newManagedProcessor(shardProcessorMock1, processorStateStarted))
133+
executor.managedProcessors.Store("test-shard-id2", newManagedProcessor(shardProcessorMock2, processorStateStarted))
140134

141135
// Do the call to heartbeat
142136
shardAssignments, err := executor.heartbeat(context.Background())
@@ -166,14 +160,8 @@ func TestHeartBeartLoop_ShardAssignmentChange(t *testing.T) {
166160
shardProcessorFactory: shardProcessorFactory,
167161
}
168162

169-
executor.managedProcessors.Store("test-shard-id1", &managedProcessor[*MockShardProcessor]{
170-
processor: shardProcessorMock1,
171-
state: processorStateStarted,
172-
})
173-
executor.managedProcessors.Store("test-shard-id2", &managedProcessor[*MockShardProcessor]{
174-
processor: shardProcessorMock2,
175-
state: processorStateStarted,
176-
})
163+
executor.managedProcessors.Store("test-shard-id1", newManagedProcessor(shardProcessorMock1, processorStateStarted))
164+
executor.managedProcessors.Store("test-shard-id2", newManagedProcessor(shardProcessorMock2, processorStateStarted))
177165

178166
// We expect to get a new assignment with shards 2 and 3 assigned to it
179167
newAssignment := map[string]*types.ShardAssignment{

0 commit comments

Comments
 (0)