@@ -2,7 +2,7 @@ use std::{
2
2
any:: { Any , TypeId } ,
3
3
collections:: HashSet ,
4
4
fs:: { self , File , OpenOptions , ReadDir } ,
5
- io:: Write ,
5
+ io:: { BufWriter , Write } ,
6
6
mem:: { swap, transmute, MaybeUninit } ,
7
7
path:: { Path , PathBuf } ,
8
8
sync:: {
@@ -13,6 +13,7 @@ use std::{
13
13
14
14
use anyhow:: { bail, Context , Result } ;
15
15
use byteorder:: { ReadBytesExt , WriteBytesExt , BE } ;
16
+ use chrono:: Local ;
16
17
use lzzzz:: lz4:: decompress;
17
18
use memmap2:: Mmap ;
18
19
use parking_lot:: { Mutex , RwLock } ;
@@ -287,6 +288,9 @@ impl TurboPersistence {
287
288
Some ( "CURRENT" ) => {
288
289
// Already read
289
290
}
291
+ Some ( "LOG" ) => {
292
+ // Ignored, write-only
293
+ }
290
294
_ => {
291
295
if !path
292
296
. file_name ( )
@@ -393,6 +397,16 @@ impl TurboPersistence {
393
397
Ok ( WriteBatch :: new ( self . path . clone ( ) , current) )
394
398
}
395
399
400
+ fn open_log ( & self ) -> Result < BufWriter < File > > {
401
+ let log_path = self . path . join ( "LOG" ) ;
402
+ let log_file = OpenOptions :: new ( )
403
+ . write ( true )
404
+ . create ( true )
405
+ . append ( true )
406
+ . open ( log_path) ?;
407
+ Ok ( BufWriter :: new ( log_file) )
408
+ }
409
+
396
410
/// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
397
411
/// visible to readers.
398
412
pub fn commit_write_batch < K : StoreKey + Send + Sync + ' static , const FAMILIES : usize > (
@@ -418,7 +432,7 @@ impl TurboPersistence {
418
432
fn commit (
419
433
& self ,
420
434
mut new_sst_files : Vec < ( u32 , File ) > ,
421
- new_blob_files : Vec < File > ,
435
+ new_blob_files : Vec < ( u32 , File ) > ,
422
436
mut indicies_to_delete : Vec < usize > ,
423
437
mut seq : u32 ,
424
438
) -> Result < ( ) , anyhow:: Error > {
@@ -432,14 +446,44 @@ impl TurboPersistence {
432
446
} )
433
447
. collect :: < Result < Vec < _ > > > ( ) ?;
434
448
435
- for file in new_blob_files {
449
+ for ( _ , file) in new_blob_files. iter ( ) {
436
450
file. sync_all ( ) ?;
437
451
}
438
452
439
453
if !indicies_to_delete. is_empty ( ) {
440
454
seq += 1 ;
441
455
}
442
456
457
+ {
458
+ let mut log = self . open_log ( ) ?;
459
+ let time = Local :: now ( ) ;
460
+ writeln ! (
461
+ log,
462
+ "Commit {seq:08} {}" ,
463
+ time. format( "%YYYY-%mm-%dd %HH:%MM" )
464
+ ) ?;
465
+ for sst in new_sst_files. iter ( ) {
466
+ let index = sst. sequence_number ( ) ;
467
+ let range = sst. range ( ) ?;
468
+ let size = sst. size ( ) ;
469
+ writeln ! (
470
+ log,
471
+ "{:08} SST family:{} {:016x}-{:016x} {} MiB" ,
472
+ index,
473
+ range. family,
474
+ range. min_hash,
475
+ range. max_hash,
476
+ size / 1024 / 1024
477
+ ) ?;
478
+ }
479
+ for ( seq, _) in new_blob_files. iter ( ) {
480
+ writeln ! ( log, "{:08} BLOB" , seq) ?;
481
+ }
482
+ for index in indicies_to_delete. iter ( ) {
483
+ writeln ! ( log, "{:08} DELETED" , index) ?;
484
+ }
485
+ }
486
+
443
487
let removed_ssts;
444
488
445
489
{
@@ -583,6 +627,7 @@ impl TurboPersistence {
583
627
let value_block_cache = & self . value_block_cache ;
584
628
let path = & self . path ;
585
629
630
+ let log_mutex = Mutex :: new ( ( ) ) ;
586
631
let result = sst_by_family
587
632
. into_par_iter ( )
588
633
. with_min_len ( 1 )
@@ -604,6 +649,32 @@ impl TurboPersistence {
604
649
} ,
605
650
) ;
606
651
652
+ if !merge_jobs. is_empty ( ) {
653
+ let guard = log_mutex. lock ( ) ;
654
+ let mut log = self . open_log ( ) ?;
655
+ writeln ! (
656
+ log,
657
+ "Compaction for family {family} (coverage: {coverage}):"
658
+ ) ?;
659
+ for job in merge_jobs. iter ( ) {
660
+ writeln ! ( log, " merge" ) ?;
661
+ for i in job. iter ( ) {
662
+ let index = ssts_with_ranges[ * i] . index ;
663
+ let ( min, max) = ssts_with_ranges[ * i] . range ( ) ;
664
+ writeln ! ( log, " {index:08} {min:016x}-{max:016x}" ) ?;
665
+ }
666
+ }
667
+ if !move_jobs. is_empty ( ) {
668
+ writeln ! ( log, " move" ) ?;
669
+ for i in move_jobs. iter ( ) {
670
+ let index = ssts_with_ranges[ * i] . index ;
671
+ let ( min, max) = ssts_with_ranges[ * i] . range ( ) ;
672
+ writeln ! ( log, " {index:08} {min:016x}-{max:016x}" ) ?;
673
+ }
674
+ }
675
+ drop ( guard) ;
676
+ }
677
+
607
678
// Later we will remove the merged and moved files
608
679
let indicies_to_delete = merge_jobs
609
680
. iter ( )
0 commit comments