11#! /usr/bin/env bash
2- set -e
3- # osmosis tuning: https://wiki.openstreetmap.org/wiki/Osmosis/Tuning,https://lists.openstreetmap.org/pipermail/talk/2012-October/064771.html
4- if [ -z " $MEMORY_JAVACMD_OPTIONS " ]; then
5- echo JAVACMD_OPTIONS=\" -server\" > ~/.osmosis
6- else
7- memory=" ${MEMORY_JAVACMD_OPTIONS// i/ } "
8- echo JAVACMD_OPTIONS=\" -server -Xmx$memory \" > ~/.osmosis
9- fi
2+ set -x
3+
4+ # ---- Directory variables ----
5+ workingDirectory=" /mnt/data"
6+ tmpDirectory=" $workingDirectory /tmp"
7+ runDirectory=" $workingDirectory /run"
108
9+ # ---- Directory setup ----
10+ mkdir -p " $workingDirectory "
11+ mkdir -p " $tmpDirectory "
12+ mkdir -p " $runDirectory "
13+
14+ # Slack setup
1115slack_message_count=0
1216max_slack_messages=2
1317
14- workingDirectory=" /mnt/data"
15- mkdir -p $workingDirectory
18+ # ---- osmdbt-config.yaml creation ----
19+ cat << EOF > /osmdbt-config.yaml
20+ database:
21+ host: ${POSTGRES_HOST:- localhost}
22+ port: ${POSTGRES_PORT:- 5432}
23+ dbname: ${POSTGRES_DB:- osm}
24+ user: ${POSTGRES_USER:- osm}
25+ password: ${POSTGRES_PASSWORD}
26+ replication_slot: ${REPLICATION_SLOT:- osm_repl}
27+
28+ log_dir: ${workingDirectory}
29+ changes_dir: ${workingDirectory}
30+ tmp_dir: ${tmpDirectory}
31+ run_dir: ${runDirectory}
32+ EOF
33+
34+
35+ # Remove lock file if it exists (avoids replication issues on restart)
36+ [ -e " $workingDirectory /replicate.lock" ] && rm -f " $workingDirectory /replicate.lock"
37+
38+
39+ function ensure_replication_slot_exists() {
40+ export PGPASSWORD=" ${POSTGRES_PASSWORD} "
41+ local slot=" ${REPLICATION_SLOT:- osm_repl} "
42+ local plugin=" osm_logical"
43+
44+ # Check if slot already exists
45+ local slot_count=$( psql -h " ${POSTGRES_HOST:- localhost} " -p " ${POSTGRES_PORT:- 5432} " -U " ${POSTGRES_USER:- osm} " -d " ${POSTGRES_DB:- osm} " -t -A -c \
46+ " SELECT count(*) FROM pg_replication_slots WHERE slot_name='$slot ';" 2> /dev/null | tr -d ' [:space:]' )
47+
48+ if [ " $slot_count " = " 1" ]; then
49+ echo " Replication slot '$slot ' already exists."
50+ return 0
51+ fi
52+
53+ # Try to create the slot
54+ psql -h " ${POSTGRES_HOST:- localhost} " -p " ${POSTGRES_PORT:- 5432} " -U " ${POSTGRES_USER:- osm} " -d " ${POSTGRES_DB:- osm} " -c \
55+ " SELECT * FROM pg_create_logical_replication_slot('$slot ', '$plugin ');"
1656
17- # Remove files that are not required
18- [ -e /mnt/data/replicate.lock ] && rm -f /mnt/data/replicate.lock
19- # [ -e /mnt/data/processed_files.log ] && rm -f /mnt/data/processediles.log
2057
58+ }
59+
60+
61+ # --- Function: retrieve last known state from S3 or local storage
2162function get_current_state_file() {
22- # Check if state.txt exist in the workingDirectory,
23- # in case the file does not exist locally and does not exist in the cloud the replication will start from 0
24- if [ ! -f $workingDirectory /state.txt ]; then
63+ # Check if state.txt exists locally
64+ if [ ! -f " $workingDirectory /state.txt" ]; then
2565 echo " File $workingDirectory /state.txt does not exist in local storage"
26- # ## AWS
27- if [ $CLOUDPROVIDER == " aws" ]; then
28- aws s3 ls $AWS_S3_BUCKET /$REPLICATION_FOLDER /state.txt
66+ # If using AWS, try downloading state.txt from the S3 bucket
67+ if [ " $CLOUDPROVIDER " == " aws" ]; then
68+ aws s3 ls " $AWS_S3_BUCKET /$REPLICATION_FOLDER /state.txt" > /dev/null
2969 if [[ $? -eq 0 ]]; then
30- echo " File exist, let's get it from $CLOUDPROVIDER - $AWS_S3_BUCKET "
31- aws s3 cp $AWS_S3_BUCKET /$REPLICATION_FOLDER /state.txt $workingDirectory /state.txt
70+ echo " File exists in S3, downloading... "
71+ aws s3 cp " $AWS_S3_BUCKET /$REPLICATION_FOLDER /state.txt" " $workingDirectory /state.txt"
3272 fi
3373 fi
34-
35- # ### GCP
36- # if [ $CLOUDPROVIDER == "gcp" ]; then
37- # gsutil ls $GCP_STORAGE_BUCKET/$REPLICATION_FOLDER/state.txt
38- # if [[ $? -eq 0 ]]; then
39- # echo "File exist, let's get it from $CLOUDPROVIDER - $GCP_STORAGE_BUCKET"
40- # gsutil cp $GCP_STORAGE_BUCKET/$REPLICATION_FOLDER/state.txt $workingDirectory/state.txt
41- # fi
42- # fi
43-
44- # ### Azure
45- # if [ $CLOUDPROVIDER == "azure" ]; then
46- # state_file_exists=$(az storage blob exists --container-name $AZURE_CONTAINER_NAME --name $REPLICATION_FOLDER/state.txt --query="exists")
47- # if [[ $state_file_exists=="true" ]]; then
48- # echo "File exist, let's get it from $CLOUDPROVIDER - $AZURE_CONTAINER_NAME"
49- # az storage blob download \
50- # --container-name $AZURE_CONTAINER_NAME \
51- # --name $REPLICATION_FOLDER/state.txt \
52- # --file $workingDirectory/state.txt --query="name"
53- # fi
54- # fi
5574 else
56- echo " File $workingDirectory /state.txt exist in local storage"
57- echo " File $workingDirectory /state.txt content:"
58- cat $workingDirectory /state.txt
75+ echo " File $workingDirectory /state.txt found locally:"
76+ cat " $workingDirectory /state.txt"
5977 fi
6078}
6179
80+ # --- Function: upload files to cloud (S3)
6281function upload_file_cloud() {
63- # Upload files to cloud provider
6482 local local_file=" $1 "
6583 local cloud_file=" $REPLICATION_FOLDER /${local_file#* " $workingDirectory /" } "
66- echo " $( date +%F_%H:%M:%S) : Upload file $local_file to ... $CLOUDPROVIDER ... $cloud_file "
84+ echo " $( date +%F_%H:%M:%S) : Upload file $local_file to $CLOUDPROVIDER ( $cloud_file ) "
6785 if [ " $CLOUDPROVIDER " == " aws" ]; then
6886 aws s3 cp " $local_file " " $AWS_S3_BUCKET /$cloud_file " --acl public-read
69- elif [ " $CLOUDPROVIDER " == " gcp" ]; then
70- gsutil cp -a public-read " $local_file " " $GCP_STORAGE_BUCKET /$cloud_file "
71- elif [ " $CLOUDPROVIDER " == " azure" ]; then
72- az storage blob upload \
73- --container-name " $AZURE_CONTAINER_NAME " \
74- --file " $local_file " \
75- --name " $cloud_file " \
76- --output none
7787 fi
7888}
7989
90+ # --- Function: send Slack notifications
8091function send_slack_message() {
81- # Check if Slack messaging is enabled
8292 if [ " ${ENABLE_SEND_SLACK_MESSAGE} " != " true" ]; then
8393 echo " Slack messaging is disabled. Set ENABLE_SEND_SLACK_MESSAGE to true to enable."
8494 return
8595 fi
86-
87- # Check if the Slack webhook URL is set
8896 if [ -z " ${SLACK_WEBHOOK_URL} " ]; then
8997 echo " SLACK_WEBHOOK_URL is not set. Unable to send message to Slack."
9098 return 1
9199 fi
92-
93- # Limit Slack message count to 3
94100 if [ " $slack_message_count " -ge " $max_slack_messages " ]; then
95101 echo " Max Slack messages limit reached. No further messages will be sent."
96102 return
97103 fi
98-
99104 local message=" $1 "
100105 curl -X POST -H ' Content-type: application/json' --data " {\" text\" : \" $message \" }" " $SLACK_WEBHOOK_URL "
101106 echo " Message sent to Slack: $message "
102107 slack_message_count=$(( slack_message_count + 1 ))
103108}
104109
105-
110+ # --- Function: track and upload minute replication files
106111function monitor_minute_replication() {
107- # Function to handle continuous monitoring, minute replication, and sequential upload to cloud provider
108- # Directory to store a log of the last processed file
109112 processed_files_log=" $workingDirectory /processed_files.log"
110113 max_log_size_mb=1
111-
112114 while true ; do
113115 if [ -e " $processed_files_log " ]; then
114116 log_size=$( du -m " $processed_files_log " | cut -f1)
117+ # Clean log if too large (avoids disk fill)
115118 if [ " $log_size " -gt " $max_log_size_mb " ]; then
116- echo $( date +%F_%H:%M:%S) " : Cleaning processed_files_log..." > " $processed_files_log "
119+ echo " $( date +%F_%H:%M:%S) : Cleaning processed_files_log..." > " $processed_files_log "
117120 fi
118- # Find new .gz files created within the last minute
119- for local_minute_file in $( find $workingDirectory / -name " *.gz" -cmin -1) ; do
121+ # Check for new .gz files created in the last minute
122+ for local_minute_file in $( find " $workingDirectory /" -name " *.gz" -cmin -1) ; do
120123 if [ -f " $local_minute_file " ]; then
121124 echo " Processing $local_minute_file ..."
122- # Ensure the file is uploaded only once
125+ # Ensure this file hasn't already been processed (success or failure)
123126 if ! grep -q " $local_minute_file : SUCCESS" " $processed_files_log " && ! grep -q " $local_minute_file : FAILURE" " $processed_files_log " ; then
124- # Verify gz file integrity
127+ # Integrity test for .gz files
125128 if gzip -t " $local_minute_file " 2> /dev/null; then
126- # Upload the file sequentially
127- upload_file_cloud $local_minute_file
129+ upload_file_cloud " $local_minute_file "
128130 local_state_file=" ${local_minute_file% .osc.gz} .state.txt"
129- upload_file_cloud $local_state_file
131+ upload_file_cloud " $local_state_file "
130132 echo " $local_minute_file : SUCCESS" >> " $processed_files_log "
131- # Upload and update state.txt after successful upload
132133 upload_file_cloud " $workingDirectory /state.txt"
133134 else
134- echo $( date +%F_%H:%M:%S) " : $local_minute_file is corrupted and will not be uploaded." >> " $processed_files_log "
135+ echo " $( date +%F_%H:%M:%S) : $local_minute_file is corrupted and will not be uploaded." >> " $processed_files_log "
135136 echo " $local_minute_file : FAILURE" >> " $processed_files_log "
136- # Ensure state.txt maintains the current ID to regenerate the corrupted file
137+ # Rollback state.txt to previous sequence
137138 current_state_id=$(( $(echo "$local_minute_file " | sed 's/ [^0 - 9 ]// g' | sed 's/ ^0 *// ') - 1 ))
138139 sed -i " s/sequenceNumber=.*/sequenceNumber=$current_state_id /" " $workingDirectory /state.txt"
139140 rm " $local_minute_file "
140- echo " Stopping any existing Osmosis processes..."
141- pkill -f " osmosis.*--replicate-apidb "
141+ echo " Stopping any existing osmdbt processes..."
142+ pkill -f " osmdbt "
142143 echo " Regenerating $local_minute_file ..."
143144 send_slack_message " ${ENVIROMENT} : Corrupted file $local_minute_file detected. Regenerating the file..."
144145 generate_replication
@@ -147,35 +148,48 @@ function monitor_minute_replication() {
147148 fi
148149 done
149150 else
150- echo " File $processed_files_log not found."
151- echo $processed_files_log > $processed_files_log
151+ echo " File $processed_files_log not found. Creating log file. "
152+ echo " $processed_files_log " > " $processed_files_log "
152153 fi
153154 sleep 10s
154155 done
155156}
156157
158+ # --- Function: run osmdbt replication tool (adjust command/options as needed)
157159function generate_replication() {
158- # Replicate the API database using Osmosis
159- osmosis -q \
160- --replicate-apidb \
161- iterations=0 \
162- minInterval=60000 \
163- maxInterval=120000 \
164- host=$POSTGRES_HOST \
165- database=$POSTGRES_DB \
166- user=$POSTGRES_USER \
167- password=$POSTGRES_PASSWORD \
168- validateSchemaVersion=no \
169- --write-replication \
170- workingDirectory=$workingDirectory
160+ # Launch osmdbt-get-log for this minute
161+ /osmdbt/build/src/osmdbt-get-log \
162+ -c /osmdbt-config.yaml
171163}
172164
173- # ####################### Start minutes replication process ########################
165+ # --- MAIN PROCESS STARTS HERE ---
174166get_current_state_file
175- flag=true
176- while " $flag " = true ; do
177- pg_isready -h $POSTGRES_HOST -p 5432 > /dev/null 2>&2 || continue
178- flag=false
179- generate_replication &
180- monitor_minute_replication
167+
168+ # Wait for PostgreSQL to be ready
169+ echo " Waiting for PostgreSQL to be ready..."
170+ max_attempts=30
171+ attempt=0
172+ while [ $attempt -lt $max_attempts ]; do
173+ if pg_isready -h " ${POSTGRES_HOST:- localhost} " -p " ${POSTGRES_PORT:- 5432} " > /dev/null 2>&1 ; then
174+ echo " PostgreSQL is ready."
175+ break
176+ fi
177+ attempt=$(( attempt + 1 ))
178+ echo " PostgreSQL not ready yet, attempt $attempt /$max_attempts ..."
179+ sleep 2
181180done
181+
182+ if [ $attempt -eq $max_attempts ]; then
183+ echo " ERROR: PostgreSQL is not ready after $max_attempts attempts"
184+ exit 1
185+ fi
186+
187+ # Ensure replication slot exists before starting replication
188+ if ! ensure_replication_slot_exists; then
189+ echo " ERROR: Failed to ensure replication slot exists. Exiting."
190+ exit 1
191+ fi
192+
193+ # Launch replication in background and start monitoring
194+ generate_replication &
195+ monitor_minute_replication
0 commit comments