Skip to content

Commit 73b53ac

Browse files
committed
feat(conflation): use separate mapping_sessions_refs table
1 parent 2658c06 commit 73b53ac

File tree

6 files changed

+99
-38
lines changed

6 files changed

+99
-38
lines changed

mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,8 @@ def save_results_to_postgres(
432432

433433
query_insert_mapping_sessions = f"""
434434
BEGIN;
435+
436+
-- Create or ensure mapping_sessions exist
435437
INSERT INTO mapping_sessions
436438
SELECT
437439
project_id,
@@ -445,21 +447,36 @@ def save_results_to_postgres(
445447
client_type
446448
FROM {result_temp_table}
447449
GROUP BY project_id, group_id, user_id, app_version, client_type
448-
ON CONFLICT (project_id,group_id,user_id)
450+
ON CONFLICT (project_id, group_id, user_id)
451+
DO NOTHING;
452+
453+
INSERT INTO {result_table} (mapping_session_id, task_id, result)
454+
SELECT
455+
ms.mapping_session_id,
456+
r.task_id,
457+
{result_sql}
458+
FROM {result_temp_table} r
459+
JOIN mapping_sessions ms ON
460+
ms.project_id = r.project_id
461+
AND ms.group_id = r.group_id
462+
AND ms.user_id = r.user_id
463+
ON CONFLICT (mapping_session_id, task_id)
449464
DO NOTHING;
450-
INSERT INTO {result_table}
465+
466+
INSERT INTO mapping_sessions_refs (mapping_session_id, task_id, ref)
451467
SELECT
452468
ms.mapping_session_id,
453469
r.task_id,
454-
{result_sql},
455470
r.ref
456471
FROM {result_temp_table} r
457472
JOIN mapping_sessions ms ON
458473
ms.project_id = r.project_id
459474
AND ms.group_id = r.group_id
460475
AND ms.user_id = r.user_id
476+
WHERE r.ref IS NOT NULL
461477
ON CONFLICT (mapping_session_id, task_id)
462478
DO NOTHING;
479+
463480
COMMIT;
464481
"""
465482
p_con.query(query_insert_mapping_sessions)

mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def get_results(
110110
if result_table == "mapping_sessions_results_geometry":
111111
result_sql = "ST_AsGeoJSON(msr.result) as result"
112112
else:
113-
result_sql = "msr.result as result, msr.ref as ref"
113+
result_sql = "msr.result as result"
114114

115115
sql_query = sql.SQL(
116116
f"""
@@ -126,6 +126,7 @@ def get_results(
126126
ms.app_version,
127127
ms.client_type,
128128
{result_sql},
129+
refs.ref as ref,
129130
-- the username for users which login to MapSwipe with their
130131
-- OSM account is not defined or ''.
131132
-- We capture this here as it will cause problems
@@ -138,7 +139,10 @@ def get_results(
138139
LEFT JOIN mapping_sessions ms ON
139140
ms.mapping_session_id = msr.mapping_session_id
140141
LEFT JOIN users U USING (user_id)
141-
WHERE project_id = {"{}"}
142+
LEFT JOIN mapping_sessions_refs refs
143+
ON msr.mapping_session_id = refs.mapping_session_id
144+
AND msr.task_id = refs.task_id
145+
WHERE ms.project_id = {"{}"}
142146
) TO STDOUT WITH CSV HEADER
143147
"""
144148
).format(sql.Literal(project_id))
@@ -513,24 +517,23 @@ def add_ref_to_agg_results(
513517
results_df: pd.DataFrame, agg_results_df: pd.DataFrame
514518
) -> pd.DataFrame:
515519
"""
516-
Adds a 'ref' column to agg_results_df for writing to CSV
520+
Adds a 'ref' column to agg_results_df if it exists in results_df.
521+
For each task_id, all unique non-empty refs are collected into a list.
522+
If no refs exist for a task, the corresponding value is empty string.
523+
If results_df has no 'ref' column, agg_results_df is returned unchanged.
517524
"""
525+
if "ref" not in results_df.columns:
526+
return agg_results_df
518527

519-
refs_per_task = results_df.groupby("task_id")["ref"].apply(list)
528+
refs_per_task = (
529+
results_df.groupby("task_id")["ref"]
530+
.apply(lambda x: list({r for r in x if pd.notna(r) and r not in ({}, "")}))
531+
.apply(lambda lst: json.dumps([json.loads(r) for r in lst]) if lst else "")
532+
)
520533

521-
ref_values = {}
522-
for task_id, refs in refs_per_task.items():
523-
# Filter out None or empty dicts
524-
refs = [r for r in refs if r not in (None, {}, "") and not pd.isna(r)]
525-
if not refs:
526-
continue
527-
elif all(r == refs[0] for r in refs):
528-
ref_values[task_id] = refs[0]
529-
else:
530-
ref_values[task_id] = refs
534+
if refs_per_task.apply(lambda x: len(x) > 0).any():
535+
agg_results_df["ref"] = agg_results_df["task_id"].map(refs_per_task).fillna("")
531536

532-
if ref_values:
533-
agg_results_df["ref"] = agg_results_df["task_id"].map(ref_values).fillna("")
534537
return agg_results_df
535538

536539

mapswipe_workers/tests/integration/set_up_db.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results (
193193
mapping_session_id int8,
194194
task_id varchar,
195195
result int2 not null,
196-
ref jsonb,
197196
PRIMARY KEY (mapping_session_id, task_id),
198197
FOREIGN KEY (mapping_session_id)
199198
references mapping_sessions (mapping_session_id)
@@ -208,6 +207,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry (
208207
references mapping_sessions (mapping_session_id)
209208
);
210209

210+
CREATE TABLE IF NOT EXISTS mapping_sessions_refs (
211+
mapping_session_id int8,
212+
task_id varchar,
213+
ref JSONB not null,
214+
PRIMARY KEY (mapping_session_id, task_id),
215+
FOREIGN KEY (mapping_session_id)
216+
references mapping_sessions (mapping_session_id)
217+
);
218+
211219
CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger
212220
LANGUAGE plpgsql AS
213221
$$

mapswipe_workers/tests/unittests/test_project_stats.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
import json
12
import unittest
23

34
import pandas as pd
45

56
from mapswipe_workers.generate_stats.project_stats import (
6-
add_ref_to_agg_results,
77
add_missing_result_columns,
8+
add_ref_to_agg_results,
89
calc_agreement,
910
calc_count,
1011
calc_parent_option_count,
@@ -175,28 +176,46 @@ def test_calc_parent_option_count(self):
175176

176177
def test_add_ref_single_ref(self):
177178
# All results have the same ref
178-
results_df = pd.DataFrame({
179-
"task_id": ["t1", "t1"],
180-
"ref": [{"osmId": 123, "osmType": "ways_poly"}, {"osmId": 123, "osmType": "ways_poly"}]
181-
})
179+
results_df = pd.DataFrame(
180+
{
181+
"task_id": ["t1", "t1"],
182+
"ref": [
183+
json.dumps({"osmId": 123, "osmType": "ways_poly"}),
184+
json.dumps({"osmId": 123, "osmType": "ways_poly"}),
185+
],
186+
}
187+
)
182188
agg_results_df = pd.DataFrame({"task_id": ["t1"]})
183189
updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())
190+
184191
self.assertIn("ref", updated_df.columns)
185-
self.assertEqual(updated_df["ref"].iloc[0], {"osmId": 123, "osmType": "ways_poly"})
192+
ref_value = json.loads(updated_df["ref"].iloc[0])
193+
self.assertEqual(ref_value, [{"osmId": 123, "osmType": "ways_poly"}])
186194

187195
def test_add_ref_multiple_refs(self):
188196
# Different refs for same task
189-
results_df = pd.DataFrame({
190-
"task_id": ["t1", "t1"],
191-
"ref": [{"osmId": 123}, {"osmId": 456}]
192-
})
197+
results_df = pd.DataFrame(
198+
{
199+
"task_id": ["t1", "t1"],
200+
"ref": [json.dumps({"osmId": 123}), json.dumps({"osmId": 456})],
201+
}
202+
)
193203
agg_results_df = pd.DataFrame({"task_id": ["t1"]})
194204
updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())
205+
195206
self.assertIn("ref", updated_df.columns)
196-
self.assertEqual(
197-
updated_df["ref"].iloc[0],
198-
[{"osmId": 123}, {"osmId": 456}]
199-
)
207+
ref_value = json.loads(updated_df["ref"].iloc[0])
208+
self.assertCountEqual(ref_value, [{"osmId": 123}, {"osmId": 456}])
209+
210+
def test_add_ref_no_refs_column(self):
211+
# results_df has no 'ref' column
212+
results_df = pd.DataFrame({"task_id": ["t1", "t2"], "result": [1, 2]})
213+
agg_results_df = pd.DataFrame({"task_id": ["t1", "t2"]})
214+
215+
updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())
216+
217+
self.assertNotIn("ref", updated_df.columns)
218+
pd.testing.assert_frame_equal(updated_df, agg_results_df)
200219

201220

202221
if __name__ == "__main__":

postgres/initdb.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results (
193193
mapping_session_id int8,
194194
task_id varchar,
195195
result int2 not null,
196-
ref jsonb,
197196
PRIMARY KEY (mapping_session_id, task_id),
198197
FOREIGN KEY (mapping_session_id)
199198
references mapping_sessions (mapping_session_id)
@@ -208,6 +207,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry (
208207
references mapping_sessions (mapping_session_id)
209208
);
210209

210+
CREATE TABLE IF NOT EXISTS mapping_sessions_refs (
211+
mapping_session_id int8,
212+
task_id varchar,
213+
ref JSONB not null,
214+
PRIMARY KEY (mapping_session_id, task_id),
215+
FOREIGN KEY (mapping_session_id)
216+
references mapping_sessions (mapping_session_id)
217+
);
218+
211219
CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger
212220
LANGUAGE plpgsql AS
213221
$$
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
ALTER TABLE mapping_sessions_results
2-
ADD COLUMN ref jsonb;
3-
41
ALTER TABLE results_temp
52
ADD COLUMN ref jsonb;
3+
4+
CREATE TABLE IF NOT EXISTS public.mapping_sessions_refs (
5+
mapping_session_id int8,
6+
task_id varchar,
7+
ref JSONB not null,
8+
PRIMARY KEY (mapping_session_id, task_id),
9+
FOREIGN KEY (mapping_session_id)
10+
references mapping_sessions (mapping_session_id)
11+
);

0 commit comments

Comments
 (0)