Skip to content

Commit 2593f34

Browse files
committed
use activerecord-imports "import"-method
See #66
1 parent 0e9d330 commit 2593f34

File tree

3 files changed

+74
-23
lines changed

3 files changed

+74
-23
lines changed

lib/strategy/base.rb

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,36 @@ def initialize source_database, destination_database, name, user_strategies
1313
@destination_database = destination_database
1414
@fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name
1515
@errors = DataAnon::Core::TableErrors.new(@name)
16+
@bulk_process = defined?(::ActiveRecord::Import)
1617
@primary_keys = []
1718
end
1819

1920
def self.whitelist?
2021
false
2122
end
2223

24+
def bulk_process?
25+
@bulk_process
26+
end
27+
28+
def bulk_process flag
29+
@bulk_process = flag
30+
end
31+
32+
def collect_for_bulk_process record
33+
Thread.current[:bulk_process_records] << record
34+
end
35+
36+
def bulk_process_records
37+
if bulk_process?
38+
Thread.current[:bulk_process_records] = []
39+
yield
40+
bulk_store Thread.current[:bulk_process_records]
41+
else
42+
yield
43+
end
44+
end
45+
2346
def process_fields &block
2447
self.instance_eval &block
2548
self
@@ -114,29 +137,35 @@ def process
114137
def process_table progress
115138
index = 0
116139

117-
source_table_limited.each do |record|
118-
index += 1
119-
begin
120-
process_record_if index, record
121-
rescue => exception
122-
@errors.log_error record, exception
140+
bulk_process_records do
141+
source_table_limited.each do |record|
142+
index += 1
143+
begin
144+
process_record_if index, record
145+
rescue => exception
146+
@errors.log_error record, exception
147+
end
148+
progress.show index
123149
end
124-
progress.show index
125150
end
126151
end
127152

128153
def process_table_in_batches progress
129154
logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
130155
index = 0
131156

132-
source_table_limited.find_each(:batch_size => @batch_size) do |record|
133-
index += 1
134-
begin
135-
process_record_if index, record
136-
rescue => exception
137-
@errors.log_error record, exception
157+
source_table_limited.find_in_batches(:batch_size => @batch_size) do |records|
158+
bulk_process_records do
159+
records.each do |record|
160+
index += 1
161+
begin
162+
process_record_if index, record
163+
rescue => exception
164+
@errors.log_error record, exception
165+
end
166+
progress.show index
167+
end
138168
end
139-
progress.show index
140169
end
141170
end
142171

@@ -154,13 +183,15 @@ def process_table_in_threads progress
154183
end
155184

156185
thr = Thread.new {
157-
records.each do |record|
158-
begin
159-
process_record_if index, record
160-
index += 1
161-
rescue => exception
162-
puts exception.inspect
163-
@errors.log_error record, exception
186+
bulk_process_records do
187+
records.each do |record|
188+
begin
189+
process_record_if index, record
190+
index += 1
191+
rescue => exception
192+
puts exception.inspect
193+
@errors.log_error record, exception
194+
end
164195
end
165196
end
166197
}

lib/strategy/blacklist.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,19 @@ def process_record index, record
1212
updates[database_field_name] = strategy.anonymize(field)
1313
end
1414
end
15-
record.update_columns(updates) if updates.any?
15+
if updates.any?
16+
if bulk_process?
17+
record.assign_attributes(updates)
18+
collect_for_bulk_process(record)
19+
else
20+
record.update_columns(updates)
21+
end
22+
end
23+
end
24+
25+
def bulk_store(records)
26+
columns = @fields.keys
27+
source_table.import @primary_keys + columns, records, validate: false, on_duplicate_key_update: columns
1628
end
1729

1830
end

lib/strategy/whitelist.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,17 @@ def process_record(index, record)
1919
@primary_keys.each do |key|
2020
dest_record[key] = record[key]
2121
end
22-
dest_record.save!
22+
if bulk_process?
23+
collect_for_bulk_process(dest_record)
24+
else
25+
dest_record.save!
26+
end
2327
end
2428

29+
def bulk_store(records)
30+
columns = source_table.column_names
31+
source_table.import @primary_keys + columns, records, validate: false, on_duplicate_key_update: columns
32+
end
2533

2634
end
2735
end

0 commit comments

Comments
 (0)