@@ -13,13 +13,41 @@ def initialize source_database, destination_database, name, user_strategies
1313 @destination_database = destination_database
1414 @fields_missing_strategy = DataAnon ::Core ::FieldsMissingStrategy . new name
1515 @errors = DataAnon ::Core ::TableErrors . new ( @name )
16+ @bulk_process = defined? ( ::ActiveRecord ::Import )
1617 @primary_keys = [ ]
1718 end
1819
1920 def self . whitelist?
2021 false
2122 end
2223
24+ def bulk_process?
25+ @bulk_process
26+ end
27+
28+ def bulk_process flag
29+ @bulk_process = flag
30+ end
31+
32+ def collect_for_bulk_process record
33+ Thread . current [ :bulk_process_records ] << record
34+ end
35+
36+ def bulk_process_records
37+ if bulk_process?
38+ Thread . current [ :bulk_process_records ] = [ ]
39+ yield
40+ bulk_store Thread . current [ :bulk_process_records ]
41+ else
42+ yield
43+ end
44+ end
45+
46+ def bulk_store ( records )
47+ columns = dest_table . column_names
48+ dest_table . import columns , records , validate : false , on_duplicate_key_update : columns , timestamps : false
49+ end
50+
2351 def process_fields &block
2452 self . instance_eval &block
2553 self
@@ -114,29 +142,35 @@ def process
114142 def process_table progress
115143 index = 0
116144
117- source_table_limited . each do |record |
118- index += 1
119- begin
120- process_record_if index , record
121- rescue => exception
122- @errors . log_error record , exception
145+ bulk_process_records do
146+ source_table_limited . each do |record |
147+ index += 1
148+ begin
149+ process_record_if index , record
150+ rescue => exception
151+ @errors . log_error record , exception
152+ end
153+ progress . show index
123154 end
124- progress . show index
125155 end
126156 end
127157
128158 def process_table_in_batches progress
129159 logger . info "Processing table #{ @name } records in batch size of #{ @batch_size } "
130160 index = 0
131161
132- source_table_limited . find_each ( :batch_size => @batch_size ) do |record |
133- index += 1
134- begin
135- process_record_if index , record
136- rescue => exception
137- @errors . log_error record , exception
162+ source_table_limited . find_in_batches ( :batch_size => @batch_size ) do |records |
163+ bulk_process_records do
164+ records . each do |record |
165+ index += 1
166+ begin
167+ process_record_if index , record
168+ rescue => exception
169+ @errors . log_error record , exception
170+ end
171+ progress . show index
172+ end
138173 end
139- progress . show index
140174 end
141175 end
142176
@@ -154,13 +188,15 @@ def process_table_in_threads progress
154188 end
155189
156190 thr = Thread . new {
157- records . each do |record |
158- begin
159- process_record_if index , record
160- index += 1
161- rescue => exception
162- puts exception . inspect
163- @errors . log_error record , exception
191+ bulk_process_records do
192+ records . each do |record |
193+ begin
194+ process_record_if index , record
195+ index += 1
196+ rescue => exception
197+ puts exception . inspect
198+ @errors . log_error record , exception
199+ end
164200 end
165201 end
166202 }
@@ -199,7 +235,6 @@ def progress_bar_class progress_bar
199235 @progress_bar = progress_bar
200236 end
201237
202-
203238 end
204239 end
205240end
0 commit comments