Skip to content

Commit b43f4aa

Browse files
authored
Update stream framework with new alternate keyToList function (#2211)
1 parent 29a6485 commit b43f4aa

File tree

1 file changed

+34
-3
lines changed

1 file changed

+34
-3
lines changed

stream.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ type Stream struct {
7575
//
7676
// Note: Calls to KeyToList are concurrent.
7777
KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
78+
// UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
79+
// instead of KeyToList. This is a new api that can be used to figure out parallelism
80+
// of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
81+
// take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
82+
// Once a thread finishes FinishThread() would be called.
83+
UseKeyToListWithThreadId bool
84+
KeyToListWithThreadId func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error)
85+
FinishThread func(threadId int) (*pb.KVList, error)
7886

7987
// This is the method where Stream sends the final output. All calls to Send are done by a
8088
// single goroutine, i.e. logic within Send method can expect single threaded execution.
@@ -143,7 +151,7 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
143151
// keyRange is [start, end), including start, excluding end. Do ensure that the start,
144152
// end byte slices are owned by keyRange struct.
145153
func (st *Stream) produceRanges(ctx context.Context) {
146-
ranges := st.db.Ranges(st.Prefix, 16)
154+
ranges := st.db.Ranges(st.Prefix, st.NumGo)
147155
y.AssertTrue(len(ranges) > 0)
148156
y.AssertTrue(ranges[0].left == nil)
149157
y.AssertTrue(ranges[len(ranges)-1].right == nil)
@@ -186,7 +194,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
186194
iterOpts := DefaultIteratorOptions
187195
iterOpts.AllVersions = true
188196
iterOpts.Prefix = st.Prefix
189-
iterOpts.PrefetchValues = false
197+
iterOpts.PrefetchValues = true
190198
iterOpts.SinceTs = st.SinceTs
191199
itr := txn.NewIterator(iterOpts)
192200
itr.ThreadId = threadId
@@ -233,7 +241,13 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
233241

234242
// Now convert to key value.
235243
itr.Alloc.Reset()
236-
list, err := st.KeyToList(item.KeyCopy(nil), itr)
244+
var list *pb.KVList
245+
var err error
246+
if st.UseKeyToListWithThreadId {
247+
list, err = st.KeyToListWithThreadId(item.KeyCopy(nil), itr, threadId)
248+
} else {
249+
list, err = st.KeyToList(item.KeyCopy(nil), itr)
250+
}
237251
if err != nil {
238252
st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
239253
continue
@@ -252,6 +266,23 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
252266
}
253267
}
254268
}
269+
270+
if st.UseKeyToListWithThreadId {
271+
if kvs, err := st.FinishThread(threadId); err != nil {
272+
return err
273+
} else {
274+
for _, kv := range kvs.Kv {
275+
kv.StreamId = streamId
276+
KVToBuffer(kv, outList)
277+
if outList.LenNoPadding() < batchSize {
278+
continue
279+
}
280+
if err := sendIt(); err != nil {
281+
return err
282+
}
283+
}
284+
}
285+
}
255286
// Mark the stream as done.
256287
if st.doneMarkers {
257288
kv := &pb.KV{

0 commit comments

Comments
 (0)