7777 current_file ,
7878 % % current file handle since the last fsync?
7979 current_file_handle ,
80- % % file handle cache
80+ % % current write file offset
8181 current_file_offset ,
82+ % % messages that were potentially removed from the current write file
83+ current_file_removes = [],
8284 % % TRef for our interval timer
8385 sync_timer_ref ,
8486 % % files that had removes
@@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef,
11501152 end , CRef , State1 )
11511153 end .
11521154
1153- remove_message (MsgId , CRef , State = # msstate { index_ets = IndexEts }) ->
1155+ remove_message (MsgId , CRef ,
1156+ State = # msstate {
1157+ index_ets = IndexEts ,
1158+ current_file = CurrentFile ,
1159+ current_file_removes = Removes }) ->
11541160 case should_mask_action (CRef , MsgId , State ) of
11551161 {true , _Location } ->
11561162 State ;
@@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
11621168 % % ets:lookup(FileSummaryEts, File),
11631169 State ;
11641170 {_Mask , # msg_location { ref_count = RefCount , file = File ,
1165- total_size = TotalSize }}
1171+ total_size = TotalSize } = Entry }
11661172 when RefCount > 0 ->
11671173 % % only update field, otherwise bad interaction with
11681174 % % concurrent GC
1169- Dec = fun () -> index_update_ref_counter (IndexEts , MsgId , - 1 ) end ,
11701175 case RefCount of
1171- % % don 't remove from cur_file_cache_ets here because
1176+ % % Don 't remove from cur_file_cache_ets here because
11721177 % % there may be further writes in the mailbox for the
1173- % % same msg.
1174- 1 -> ok = Dec (),
1175- delete_file_if_empty (
1176- File , gc_candidate (File ,
1177- adjust_valid_total_size (
1178- File , - TotalSize , State )));
1179- _ -> ok = Dec (),
1180- gc_candidate (File , State )
1178+ % % same msg. We will remove 0 ref_counts when rolling
1179+ % % over to the next write file.
1180+ 1 when File =:= CurrentFile ->
1181+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1182+ State1 = State # msstate {current_file_removes =
1183+ [Entry # msg_location {ref_count = 0 }|Removes ]},
1184+ delete_file_if_empty (
1185+ File , gc_candidate (File ,
1186+ adjust_valid_total_size (
1187+ File , - TotalSize , State1 )));
1188+ 1 ->
1189+ index_delete (IndexEts , MsgId ),
1190+ delete_file_if_empty (
1191+ File , gc_candidate (File ,
1192+ adjust_valid_total_size (
1193+ File , - TotalSize , State )));
1194+ _ ->
1195+ index_update_ref_counter (IndexEts , MsgId , - 1 ),
1196+ gc_candidate (File , State )
11811197 end
11821198 end .
11831199
@@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file(
12391255 cur_file_cache_ets = CurFileCacheEts ,
12401256 file_size_limit = FileSizeLimit })
12411257 when Offset >= FileSizeLimit ->
1242- State1 = internal_sync (State ),
1258+ % % Cleanup the index of messages that were removed before rolling over.
1259+ State0 = cleanup_index_on_roll_over (State ),
1260+ State1 = internal_sync (State0 ),
12431261 ok = writer_close (CurHdl ),
12441262 NextFile = CurFile + 1 ,
12451263 {ok , NextHdl } = writer_open (Dir , NextFile ),
@@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin,
12671285 index_ets = IndexEts ,
12681286 file_summary_ets = FileSummaryEts ,
12691287 cur_file_cache_ets = CurFileCacheEts }) ->
1288+ % % Cleanup the index of messages that were removed before rolling over.
1289+ State1 = cleanup_index_on_roll_over (State0 ),
12701290 {LargeMsgFile , LargeMsgHdl } = case CurOffset of
12711291 % % We haven't written in the file yet. Use it.
12721292 0 ->
@@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin,
12861306 ok = index_insert (IndexEts ,
12871307 # msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
12881308 offset = 0 , total_size = TotalSize }),
1289- State1 = case CurFile of
1309+ State2 = case CurFile of
12901310 % % We didn't open a new file. We must update the existing value.
12911311 LargeMsgFile ->
12921312 [_ ,_ ] = ets :update_counter (FileSummaryEts , LargeMsgFile ,
12931313 [{# file_summary .valid_total_size , TotalSize },
12941314 {# file_summary .file_size , TotalSize }]),
1295- State0 ;
1315+ State1 ;
12961316 % % We opened a new file. We can insert it all at once.
12971317 % % We must also check whether we need to delete the previous
12981318 % % current file, because if there is no valid data this is
@@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin,
13031323 valid_total_size = TotalSize ,
13041324 file_size = TotalSize ,
13051325 locked = false }),
1306- delete_file_if_empty (CurFile , State0 # msstate { current_file_handle = LargeMsgHdl ,
1326+ delete_file_if_empty (CurFile , State1 # msstate { current_file_handle = LargeMsgHdl ,
13071327 current_file = LargeMsgFile ,
13081328 current_file_offset = TotalSize })
13091329 end ,
@@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin,
13181338 % % Delete messages from the cache that were written to disk.
13191339 true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
13201340 % % Process confirms (this won't flush; we already did) and continue.
1321- State = internal_sync (State1 ),
1341+ State = internal_sync (State2 ),
13221342 State # msstate { current_file_handle = NextHdl ,
13231343 current_file = NextFile ,
13241344 current_file_offset = 0 }.
13251345
1346+ cleanup_index_on_roll_over (State = # msstate {
1347+ index_ets = IndexEts ,
1348+ current_file_removes = Removes }) ->
1349+ lists :foreach (fun (Entry ) ->
1350+ % % We delete objects that have ref_count=0. If a message
1351+ % % got its ref_count increased, it will not be deleted.
1352+ % % We thus avoid extra index lookups to check for ref_count.
1353+ index_delete_object (IndexEts , Entry )
1354+ end , Removes ),
1355+ State # msstate {current_file_removes = []}.
1356+
13261357contains_message (MsgId , From , State = # msstate { index_ets = IndexEts }) ->
13271358 MsgLocation = index_lookup_positive_ref_count (IndexEts , MsgId ),
13281359 gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) ->
16431674 ok .
16441675
16451676index_update_fields (IndexEts , Key , Updates ) ->
1646- true = ets :update_element (IndexEts , Key , Updates ),
1677+ _ = ets :update_element (IndexEts , Key , Updates ),
16471678 ok .
16481679
16491680index_update_ref_counter (IndexEts , Key , RefCount ) ->
@@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate {
19671998% % We do not try to look at messages that are not the last because we do not want to
19681999% % accidentally write over messages that were moved earlier.
19692000
1970- compact_file (File , State = # gc_state { index_ets = IndexEts ,
1971- file_summary_ets = FileSummaryEts ,
1972- dir = Dir ,
1973- msg_store = Server }) ->
2001+ compact_file (File , State = # gc_state { file_summary_ets = FileSummaryEts }) ->
2002+ case ets :lookup (FileSummaryEts , File ) of
2003+ [] ->
2004+ rabbit_log :debug (" File ~tp has already been deleted; no need to compact" ,
2005+ [File ]),
2006+ ok ;
2007+ [# file_summary {file_size = FileSize }] ->
2008+ compact_file (File , FileSize , State )
2009+ end .
2010+
2011+ compact_file (File , FileSize ,
2012+ State = # gc_state { index_ets = IndexEts ,
2013+ file_summary_ets = FileSummaryEts ,
2014+ dir = Dir ,
2015+ msg_store = Server }) ->
19742016 % % Get metadata about the file. Will be used to calculate
19752017 % % how much data was reclaimed as a result of compaction.
19762018 [# file_summary {file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
@@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
21232165
21242166-spec delete_file (non_neg_integer (), gc_state ()) -> ok | defer .
21252167
2126- delete_file (File , State = # gc_state { file_summary_ets = FileSummaryEts ,
2127- file_handles_ets = FileHandlesEts ,
2128- dir = Dir }) ->
2168+ delete_file (File , # gc_state { file_summary_ets = FileSummaryEts ,
2169+ file_handles_ets = FileHandlesEts ,
2170+ dir = Dir }) ->
21292171 case ets :match_object (FileHandlesEts , {{'_' , File }, '_' }, 1 ) of
21302172 {[_ |_ ], _Cont } ->
21312173 rabbit_log :debug (" Asked to delete file ~p but it has active readers. Deferring." ,
@@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
21342176 _ ->
21352177 [# file_summary { valid_total_size = 0 ,
21362178 file_size = FileSize }] = ets :lookup (FileSummaryEts , File ),
2137- [] = scan_and_vacuum_message_file (File , State ),
21382179 ok = file :delete (form_filename (Dir , filenum_to_name (File ))),
21392180 true = ets :delete (FileSummaryEts , File ),
21402181 rabbit_log :debug (" Deleted empty file number ~tp ; reclaimed ~tp bytes" , [File , FileSize ]),
0 commit comments