Skip to content

Commit 27a1492

Browse files
authored
Merge pull request #698 from Unknwon/PDR-7022-cloudtrail-improve
reader/cloudtrail: improve sync process
2 parents c7f9ff7 + ab0df0e commit 27a1492

File tree

1 file changed

+101
-92
lines changed

1 file changed

+101
-92
lines changed

reader/cloudtrail/cloudtrail.go

Lines changed: 101 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ func (r *Reader) Reset() (err error) {
9898
}
9999

100100
func (r *Reader) Close() error {
101-
log.Debugf("runner[%v] syncMgr.stopSync...", r.Meta.RunnerName)
101+
log.Infof("Runner[%v] syncMgr.stopSync...", r.Meta.RunnerName)
102102
r.syncMgr.stopSync()
103-
log.Debugf("runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName)
103+
log.Infof("Runner[%v] syncMgr closed, wait for BufReader closed...", r.Meta.RunnerName)
104104
return r.BufReader.Close()
105105
}
106106

@@ -185,10 +185,17 @@ func buildSyncOptions(conf conf.MapConf) (*syncOptions, error) {
185185
if opts.interval, err = time.ParseDuration(s); err != nil {
186186
return nil, invalidConfigError(reader.KeySyncInterval, s, err)
187187
}
188+
if opts.interval.Nanoseconds() <= 0 {
189+
opts.interval = 5 * time.Minute
190+
}
191+
188192
s, _ = conf.GetStringOr(reader.KeySyncConcurrent, "5")
189193
if opts.concurrent, err = strconv.Atoi(s); err != nil {
190194
return nil, invalidConfigError(reader.KeySyncInterval, s, err)
191195
}
196+
if opts.concurrent <= 0 {
197+
opts.concurrent = 5
198+
}
192199

193200
return &opts, nil
194201
}
@@ -231,24 +238,18 @@ func makeSyncSource(bucket, prefix string) string {
231238
func (mgr *syncManager) startSync() {
232239
ticker := time.NewTicker(mgr.interval)
233240
defer ticker.Stop()
234-
235-
if err := mgr.syncOnce(); err != nil {
236-
log.Errorf("sync failed: %v", err)
237-
}
238-
239-
Sync:
240241
for {
242+
if err := mgr.syncOnce(); err != nil {
243+
log.Errorf("Runner[%v] daemon sync once failed: %v", mgr.meta.RunnerName, err)
244+
}
245+
241246
select {
242-
case <-ticker.C:
243-
if err := mgr.syncOnce(); err != nil {
244-
log.Errorf("sync failed: %v", err)
245-
}
246247
case <-mgr.quitChan:
247-
break Sync
248+
log.Infof("Runner[%v] daemon has stopped from running", mgr.meta.RunnerName)
249+
return
250+
case <-ticker.C:
248251
}
249252
}
250-
251-
log.Info("sync stopped working")
252253
}
253254

254255
func (mgr *syncManager) syncOnce() error {
@@ -261,8 +262,7 @@ func (mgr *syncManager) syncOnce() error {
261262
concurrent: mgr.concurrent,
262263
region: mgr.region,
263264
}
264-
runner := newSyncRunner(ctx, mgr.quitChan)
265-
return runner.Sync()
265+
return newSyncRunner(ctx, mgr.quitChan).Sync()
266266
}
267267

268268
func (mgr *syncManager) stopSync() {
@@ -317,32 +317,85 @@ func validTarget(target string) bool {
317317
return false
318318
}
319319

320+
const maximumFlushSyncedFilesOneTime = 10
321+
322+
func storeSyncedFiles(f *os.File, syncedFiles map[string]bool) error {
323+
if len(syncedFiles) <= 0 {
324+
return nil
325+
}
326+
327+
for path := range syncedFiles {
328+
f.WriteString(filepath.Base(path))
329+
f.WriteString("\n")
330+
}
331+
332+
return f.Sync()
333+
}
334+
335+
// Note: 非线程安全,需由调用者保证同步调用
320336
func (s *syncRunner) syncToDir() error {
321-
log.Info("syncing from s3...")
337+
log.Infof("Runner[%v] syncing from s3...", s.meta.RunnerName)
338+
339+
metastore, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
340+
if err != nil {
341+
return fmt.Errorf("open metastore: %v", err)
342+
}
343+
defer metastore.Close()
322344

323345
s3url := newS3Url(s.source)
324346
bucket, err := lookupBucket(s3url.Bucket(), s.auth, s.region)
325347
if err != nil {
326-
return err
348+
return fmt.Errorf("lookup bucket: %v", err)
327349
}
328350

329351
sourceFiles := make(map[string]bool)
330352
sourceFiles, err = loadS3Files(bucket, s3url.Path(), sourceFiles, "")
331353
if err != nil {
332-
return err
354+
return fmt.Errorf("load s3 files: %v", err)
333355
}
334356
if s.syncedFiles == nil {
335357
s.syncedFiles, err = s.loadSyncedFiles()
336358
if err != nil {
337-
return err
359+
return fmt.Errorf("load synced files: %v", err)
338360
}
339361
}
340362

341-
err = s.concurrentSyncToDir(s3url, bucket, sourceFiles)
342-
if err != nil {
343-
return err
344-
}
345-
return s.storeSyncedFiles(sourceFiles)
363+
syncedChan := make(chan string, s.concurrent)
364+
doneChan := make(chan struct{})
365+
go func() {
366+
syncedFiles := make(map[string]bool, maximumFlushSyncedFilesOneTime)
367+
for file := range syncedChan {
368+
syncedFiles[file] = true
369+
370+
if len(syncedFiles) < maximumFlushSyncedFilesOneTime {
371+
continue
372+
}
373+
374+
if err = storeSyncedFiles(metastore, syncedFiles); err != nil {
375+
log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore)
376+
} else {
377+
log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore)
378+
379+
// Note: 可能导致在 Sync 失败的情况下部分文件名重复输入到 metastore 中,但比丢失已同步记录重新处理一遍相同数据结果要更加合理
380+
syncedFiles = make(map[string]bool, maximumFlushSyncedFilesOneTime)
381+
}
382+
}
383+
384+
if err = storeSyncedFiles(metastore, syncedFiles); err != nil {
385+
log.Errorf("Runner[%v] wrote synced files to %q failed: %v", s.meta.RunnerName, s.metastore)
386+
} else {
387+
log.Infof("Runner[%v] wrote %d synced files to %q", s.meta.RunnerName, len(syncedFiles), s.metastore)
388+
}
389+
390+
doneChan <- struct{}{}
391+
}()
392+
393+
s.concurrentSyncToDir(syncedChan, s3url, bucket, sourceFiles)
394+
close(syncedChan)
395+
396+
<-doneChan
397+
log.Infof("Runner[%v] daemon has finished syncing", s.meta.RunnerName)
398+
return nil
346399
}
347400

348401
type s3Url struct {
@@ -445,51 +498,29 @@ func (s *syncRunner) loadSyncedFiles() (map[string]bool, error) {
445498
return files, nil
446499
}
447500

448-
func (s *syncRunner) storeSyncedFiles(files map[string]bool) error {
449-
if len(files) <= 0 {
450-
return nil
451-
}
452-
453-
f, err := os.OpenFile(s.metastore, os.O_WRONLY|os.O_APPEND, 0644)
454-
if err != nil {
455-
return err
456-
}
457-
defer f.Close()
458-
459-
w := bufio.NewWriter(f)
460-
for path := range files {
461-
w.WriteString(filepath.Base(path))
462-
w.WriteByte('\n')
463-
}
464-
465-
log.Infof("write %d synced files to %q", len(files), s.metastore)
466-
467-
return w.Flush()
468-
}
469-
470501
func relativePath(path string, filePath string) string {
471502
if path == "." {
472503
return strings.TrimPrefix(filePath, "/")
473504
}
474505
return strings.TrimPrefix(strings.TrimPrefix(filePath, path), "/")
475506
}
476507

477-
func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) error {
478-
doneChan := newDoneChan(s.concurrent)
508+
// concurrentSyncToDir 并发地获取 bucket 中的文件,并返回本次同步实际完成的文件
509+
func (s *syncRunner) concurrentSyncToDir(syncedChan chan string, s3url s3Url, bucket *s3.Bucket, sourceFiles map[string]bool) {
479510
pool := newPool(s.concurrent)
480-
481511
var wg sync.WaitGroup
512+
513+
DONE:
482514
for s3file := range sourceFiles {
483515
select {
484516
case <-s.quitChan:
485517
log.Warnf("Runner[%v] daemon has stopped, task is interrupted", s.meta.RunnerName)
486-
return nil
518+
break DONE
487519
default:
488520
}
489521

490-
//对于目录不同步
522+
// 对于目录不同步
491523
if strings.HasSuffix(s3file, string(os.PathSeparator)) {
492-
delete(sourceFiles, s3file)
493524
continue
494525
}
495526
basename := filepath.Base(s3file)
@@ -499,38 +530,31 @@ func (s *syncRunner) concurrentSyncToDir(s3url s3Url, bucket *s3.Bucket, sourceF
499530
if filepath.Dir(filePath) != "." {
500531
err := os.MkdirAll(filepath.Dir(filePath), 0755)
501532
if err != nil {
502-
return err
533+
log.Errorf("Runner[%v] create local directory %q failed: %v", s.meta.RunnerName, filepath.Dir(filePath), err)
534+
continue
503535
}
504536
}
505537
<-pool
506538
s.syncedFiles[basename] = true
507539

508-
log.Debugf("starting sync: s3://%s/%s -> %s", bucket.Name, s3file, filePath)
540+
log.Debugf("Runner[%v] start syncing: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath)
509541

510542
wg.Add(1)
511-
go func(doneChan chan error, filePath string, bucket *s3.Bucket, s3file string) {
543+
go func(filePath string, bucket *s3.Bucket, s3file string) {
512544
defer wg.Done()
513-
syncSingleFile(doneChan, filePath, bucket, s3file)
514-
pool <- 1
515-
}(doneChan, filePath, bucket, s3file)
545+
if err := writeFile(filePath, bucket, s3file); err != nil {
546+
log.Errorf("Runner[%v] write file %q to local failed: %v", s.meta.RunnerName, s3file, err)
547+
return
548+
}
549+
syncedChan <- s3file
550+
log.Debugf("Runner[%v] sync completed: s3://%s/%s -> %s", s.meta.RunnerName, bucket.Name, s3file, filePath)
551+
pool <- struct{}{}
552+
}(filePath, bucket, s3file)
516553
} else {
517-
delete(sourceFiles, s3file)
518-
log.Debugf("%s already synced, skip it...", unzipPath)
554+
log.Debugf("Runner[%v] %q already synced, skipped this time", s.meta.RunnerName, unzipPath)
519555
}
520556
}
521557
wg.Wait()
522-
523-
log.Info("sync done in this round")
524-
return nil
525-
}
526-
527-
func syncSingleFile(doneChan chan error, filePath string, bucket *s3.Bucket, file string) {
528-
err := writeFile(filePath, bucket, file)
529-
if err != nil {
530-
doneChan <- err
531-
}
532-
log.Debugf("sync completed: s3://%s/%s -> %s", bucket.Name, file, filePath)
533-
doneChan <- nil
534558
}
535559

536560
func writeToFile(zipf *zip.File, filename string) error {
@@ -574,25 +598,10 @@ func writeFile(filename string, bucket *s3.Bucket, path string) error {
574598
return ioutil.WriteFile(filename, data, os.FileMode(0644))
575599
}
576600

577-
func newPool(concurrent int) chan int {
578-
pool := make(chan int, concurrent)
601+
func newPool(concurrent int) chan struct{} {
602+
pool := make(chan struct{}, concurrent)
579603
for x := 0; x < concurrent; x++ {
580-
pool <- 1
604+
pool <- struct{}{}
581605
}
582606
return pool
583607
}
584-
585-
func newDoneChan(concurrent int) chan error {
586-
doneChan := make(chan error, concurrent)
587-
go func() {
588-
for {
589-
select {
590-
case err := <-doneChan:
591-
if err != nil {
592-
log.Error(err)
593-
}
594-
}
595-
}
596-
}()
597-
return doneChan
598-
}

0 commit comments

Comments
 (0)