@@ -76,6 +76,7 @@ function LevelPouch(opts, callback) {
7676 running : false ,
7777 docCount : - 1
7878 } ;
79+ db . _compactionQueue = [ ] ;
7980 if ( opts . db || opts . noMigrate ) {
8081 afterDBCreated ( ) ;
8182 } else {
@@ -577,13 +578,15 @@ function LevelPouch(opts, callback) {
577578
578579 for ( var i = 0 ; i < attachments . length ; i ++ ) {
579580 var key = attachments [ i ] ;
581+ var att = doc . data . _attachments [ key ] ;
580582
581- if ( doc . data . _attachments [ key ] . stub ) {
582- recv ++ ;
583- collectResults ( ) ;
583+ if ( att . stub ) {
584+ // still need to update the refs mapping
585+ var id = doc . data . _id ;
586+ var rev = doc . data . _rev ;
587+ saveAttachmentRefs ( id , rev , att . digest , attachmentSaved ) ;
584588 continue ;
585589 }
586- var att = doc . data . _attachments [ key ] ;
587590 var data ;
588591 if ( typeof att . data === 'string' ) {
589592 try {
@@ -650,18 +653,19 @@ function LevelPouch(opts, callback) {
650653 finish ( ) ;
651654 }
652655 }
653-
654- function saveAttachment ( docInfo , digest , key , data , callback ) {
655- var att = docInfo . data . _attachments [ key ] ;
656- delete att . data ;
657- att . digest = digest ;
658- att . length = data . length ;
656+
657+ function saveAttachmentRefs ( id , rev , digest , callback ) {
659658 stores . attachmentStore . get ( digest , function ( err , oldAtt ) {
660- if ( err && err . name !== 'NotFoundError' ) {
661- return callback ( err ) ;
659+ var newAttachment = false ;
660+ if ( err ) {
661+ if ( err . name !== 'NotFoundError' ) {
662+ return callback ( err ) ;
663+ } else {
664+ newAttachment = true ;
665+ }
662666 }
663667
664- var ref = [ docInfo . metadata . id , docInfo . metadata . rev ] . join ( '@' ) ;
668+ var ref = [ id , rev ] . join ( '@' ) ;
665669 var newAtt = { } ;
666670
667671 if ( oldAtt ) {
@@ -678,14 +682,37 @@ function LevelPouch(opts, callback) {
678682 }
679683
680684 stores . attachmentStore . put ( digest , newAtt , function ( err ) {
681- // do not try to store empty attachments
682- if ( data . length === 0 ) {
685+ if ( err ) {
683686 return callback ( err ) ;
684687 }
685- // doing this in batch causes a test to fail, wtf?
686- stores . binaryStore . put ( digest , data , function ( err ) {
687- callback ( err ) ;
688- } ) ;
688+ callback ( null , newAttachment ) ;
689+ } ) ;
690+ } ) ;
691+ }
692+
693+ function saveAttachment ( docInfo , digest , key , data , callback ) {
694+ var att = docInfo . data . _attachments [ key ] ;
695+ delete att . data ;
696+ att . digest = digest ;
697+ att . length = data . length ;
698+ var id = docInfo . metadata . id ;
699+ var rev = docInfo . metadata . rev ;
700+
701+ saveAttachmentRefs ( id , rev , digest , function ( err , newAttachment ) {
702+ if ( err ) {
703+ return callback ( err ) ;
704+ }
705+ // do not try to store empty attachments
706+ if ( data . length === 0 ) {
707+ return callback ( err ) ;
708+ }
709+ if ( ! newAttachment ) {
710+ // small optimization - don't bother writing it again
711+ return callback ( err ) ;
712+ }
713+ // doing this in batch causes a test to fail, wtf?
714+ stores . binaryStore . put ( digest , data , function ( err ) {
715+ callback ( err ) ;
689716 } ) ;
690717 } ) ;
691718 }
@@ -984,15 +1011,34 @@ function LevelPouch(opts, callback) {
9841011 } ;
9851012
9861013 api . _doCompaction = function ( docId , rev_tree , revs , callback ) {
1014+ if ( ! revs . length ) {
1015+ return callback ( ) ;
1016+ }
1017+ db . _compactionQueue . push ( [ docId , rev_tree , revs , callback ] ) ;
1018+ if ( db . _compactionQueue . length === 1 ) {
1019+ api . _doCompactionSequentially ( docId , rev_tree , revs , callback ) ;
1020+ }
1021+ } ;
1022+
1023+ // execute compactions synchronously so we
1024+ // can correctly dedup attachments
1025+ api . _doCompactionSequentially = function ( docId , rev_tree , revs , callback ) {
1026+ var originalCallback = callback ;
1027+ callback = function ( err ) {
1028+ originalCallback ( err ) ;
1029+ process . nextTick ( function ( ) {
1030+ db . _compactionQueue . shift ( ) ;
1031+ if ( db . _compactionQueue . length ) {
1032+ api . _doCompactionSequentially . apply ( api , db . _compactionQueue [ 0 ] ) ;
1033+ }
1034+ } ) ;
1035+ } ;
9871036 stores . docStore . get ( docId , function ( err , metadata ) {
9881037 if ( err ) {
9891038 return callback ( err ) ;
9901039 }
9911040 var seqs = metadata . rev_map ; // map from rev to seq
9921041 metadata . rev_tree = rev_tree ;
993- if ( ! revs . length ) {
994- return callback ( ) ;
995- }
9961042 var batch = [ ] ;
9971043 batch . push ( {
9981044 key : metadata . id ,
@@ -1001,6 +1047,88 @@ function LevelPouch(opts, callback) {
10011047 valueEncoding : vuvuEncoding ,
10021048 prefix : stores . docStore
10031049 } ) ;
1050+
1051+ var digestMap = { } ;
1052+ var numDone = 0 ;
1053+ var overallErr ;
1054+ function checkDone ( err ) {
1055+ if ( err ) {
1056+ overallErr = err ;
1057+ }
1058+ if ( ++ numDone === revs . length ) { // done
1059+ if ( overallErr ) {
1060+ return callback ( err ) ;
1061+ }
1062+ deleteOrphanedAttachments ( ) ;
1063+ }
1064+ }
1065+
1066+ function finish ( err ) {
1067+ if ( err ) {
1068+ return callback ( err ) ;
1069+ }
1070+ db . batch ( batch , callback ) ;
1071+ }
1072+
1073+ function deleteOrphanedAttachments ( ) {
1074+ var possiblyOrphanedAttachments = Object . keys ( digestMap ) ;
1075+ if ( ! possiblyOrphanedAttachments . length ) {
1076+ return finish ( ) ;
1077+ }
1078+ var numDone = 0 ;
1079+ var overallErr ;
1080+ function checkDone ( err ) {
1081+ if ( err ) {
1082+ overallErr = err ;
1083+ }
1084+ if ( ++ numDone === possiblyOrphanedAttachments . length ) {
1085+ finish ( overallErr ) ;
1086+ }
1087+ }
1088+ var refsToDelete = new utils . Map ( ) ;
1089+ revs . forEach ( function ( rev ) {
1090+ refsToDelete . set ( docId + '@' + rev , true ) ;
1091+ } ) ;
1092+ possiblyOrphanedAttachments . forEach ( function ( digest ) {
1093+ stores . attachmentStore . get ( digest , function ( err , attData ) {
1094+ if ( err ) {
1095+ if ( err . name === 'NotFoundError' ) {
1096+ return checkDone ( ) ;
1097+ } else {
1098+ return checkDone ( err ) ;
1099+ }
1100+ }
1101+ var refs = Object . keys ( attData . refs || { } ) . filter ( function ( ref ) {
1102+ return ! refsToDelete . has ( ref ) ;
1103+ } ) ;
1104+ var newRefs = { } ;
1105+ refs . forEach ( function ( ref ) {
1106+ newRefs [ ref ] = true ;
1107+ } ) ;
1108+ if ( refs . length ) { // not orphaned
1109+ batch . push ( {
1110+ key : digest ,
1111+ type : 'put' ,
1112+ valueEncoding : 'json' ,
1113+ value : { refs : newRefs } ,
1114+ prefix : stores . attachmentStore
1115+ } ) ;
1116+ } else { // orphaned, can safely delete
1117+ batch = batch . concat ( [ {
1118+ key : digest ,
1119+ type : 'del' ,
1120+ prefix : stores . attachmentStore
1121+ } , {
1122+ key : digest ,
1123+ type : 'del' ,
1124+ prefix : stores . binaryStore
1125+ } ] ) ;
1126+ }
1127+ checkDone ( ) ;
1128+ } ) ;
1129+ } ) ;
1130+ }
1131+
10041132 revs . forEach ( function ( rev ) {
10051133 var seq = seqs [ rev ] ;
10061134 if ( ! seq ) {
@@ -1011,8 +1139,22 @@ function LevelPouch(opts, callback) {
10111139 type : 'del' ,
10121140 prefix : stores . bySeqStore
10131141 } ) ;
1142+ stores . bySeqStore . get ( formatSeq ( seq ) , function ( err , doc ) {
1143+ if ( err ) {
1144+ if ( err . name === 'NotFoundError' ) {
1145+ return checkDone ( ) ;
1146+ } else {
1147+ return checkDone ( err ) ;
1148+ }
1149+ }
1150+ var atts = Object . keys ( doc . _attachments || { } ) ;
1151+ atts . forEach ( function ( attName ) {
1152+ var digest = doc . _attachments [ attName ] . digest ;
1153+ digestMap [ digest ] = true ;
1154+ } ) ;
1155+ checkDone ( ) ;
1156+ } ) ;
10141157 } ) ;
1015- db . batch ( batch , callback ) ;
10161158 } ) ;
10171159 } ;
10181160
0 commit comments