Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 82 additions & 17 deletions sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,17 @@ func newSQLStore(sessionID SessionID, driver string, dataSourceName string, conn
// Reset deletes the store records and sets the seqnums back to 1
func (store *sqlStore) Reset() error {
s := store.sessionID
_, err := store.db.Exec(`DELETE FROM messages
qr := `DELETE FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `DELETE FROM messages
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8`
}
_, err := store.db.Exec(qr,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -93,11 +100,20 @@ func (store *sqlStore) Reset() error {
return err
}

_, err = store.db.Exec(`UPDATE sessions
qr = `UPDATE sessions
SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions
SET creation_time=$1, incoming_seqnum=$2, outgoing_seqnum=$3
WHERE beginstring=$4 AND session_qualifier=$5
AND sendercompid=$6 AND sendersubid=$7 AND senderlocid=$8
AND targetcompid=$9 AND targetsubid=$10 AND targetlocid=$11`
}

_, err = store.db.Exec(qr,
store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -118,11 +134,19 @@ func (store *sqlStore) populateCache() (err error) {
s := store.sessionID
var creationTime time.Time
var incomingSeqNum, outgoingSeqNum int
row := store.db.QueryRow(`SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
qr := `SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8`
}
row := store.db.QueryRow(qr,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -143,12 +167,21 @@ func (store *sqlStore) populateCache() (err error) {
}

// session record not found, create it
_, err = store.db.Exec(`INSERT INTO sessions (
qr = `INSERT INTO sessions (
creation_time, incoming_seqnum, outgoing_seqnum,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
if store.sqlDriver == "postgres" {
qr = `INSERT INTO sessions (
creation_time, incoming_seqnum, outgoing_seqnum,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`
}
_, err = store.db.Exec(qr,
store.cache.creationTime,
store.cache.NextTargetMsgSeqNum(),
store.cache.NextSenderMsgSeqNum(),
Expand All @@ -172,10 +205,17 @@ func (store *sqlStore) NextTargetMsgSeqNum() int {
// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent
func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
s := store.sessionID
_, err := store.db.Exec(`UPDATE sessions SET outgoing_seqnum = ?
qr := `UPDATE sessions SET outgoing_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions SET outgoing_seqnum = $1
WHERE beginstring=$2 AND session_qualifier=$3
AND sendercompid=$4 AND sendersubid=$5 AND senderlocid=$6
AND targetcompid=$7 AND targetsubid=$8 AND targetlocid=$9`
}
_, err := store.db.Exec(qr,
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -188,10 +228,17 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received
func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {
s := store.sessionID
_, err := store.db.Exec(`UPDATE sessions SET incoming_seqnum = ?
qr := `UPDATE sessions SET incoming_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions SET incoming_seqnum = $1
WHERE beginstring=$2 AND session_qualifier=$3
AND sendercompid=$4 AND sendersubid=$5 AND senderlocid=$6
AND targetcompid=$7 AND targetsubid=$8 AND targetlocid=$9`
}
_, err := store.db.Exec(qr,
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand Down Expand Up @@ -221,12 +268,21 @@ func (store *sqlStore) CreationTime() time.Time {
func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
s := store.sessionID

_, err := store.db.Exec(`INSERT INTO messages (
qr := `INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
if store.sqlDriver == "postgres" {
qr = `INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
}
_, err := store.db.Exec(qr,
seqNum, string(msg),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -238,12 +294,21 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
s := store.sessionID
var msgs [][]byte
rows, err := store.db.Query(`SELECT message FROM messages
qr := `SELECT message FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?
AND msgseqnum>=? AND msgseqnum<=?
ORDER BY msgseqnum`,
ORDER BY msgseqnum`
if store.sqlDriver == "postgres" {
qr = `SELECT message FROM messages
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8
AND msgseqnum>=$9 AND msgseqnum<=$10
ORDER BY msgseqnum`
}
rows, err := store.db.Query(qr,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID,
Expand Down