Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
HeartBtInt string = "HeartBtInt"
FileLogPath string = "FileLogPath"
FileStorePath string = "FileStorePath"
FileStoreSync string = "FileStoreSync"
SQLStoreDriver string = "SQLStoreDriver"
SQLStoreDataSourceName string = "SQLStoreDataSourceName"
SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime"
Expand Down
6 changes: 3 additions & 3 deletions file_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type fileLogFactory struct {
sessionLogPaths map[SessionID]string
}

//NewFileLogFactory creates an instance of LogFactory that writes messages and events to file.
//The location of global and session log files is configured via FileLogPath.
// NewFileLogFactory creates an instance of LogFactory that writes messages and events to file.
// The location of global and session log files is configured via FileLogPath.
func NewFileLogFactory(settings *Settings) (LogFactory, error) {
logFactory := fileLogFactory{}

Expand Down Expand Up @@ -97,6 +97,6 @@ func (f fileLogFactory) CreateSessionLog(sessionID SessionID) (Log, error) {
return nil, fmt.Errorf("logger not defined for %v", sessionID)
}

prefix := sessionIDFilenamePrefix(sessionID)
prefix := SessionIDFilenamePrefix(sessionID)
return newFileLog(prefix, logPath)
}
43 changes: 30 additions & 13 deletions filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"strconv"
"time"

"github.com/quickfixgo/quickfix/config"
"github.com/pkg/errors"
"github.com/quickfixgo/quickfix/config"
)

type msgDef struct {
Expand All @@ -36,6 +36,7 @@ type fileStore struct {
sessionFile *os.File
senderSeqNumsFile *os.File
targetSeqNumsFile *os.File
fileSync bool
}

// NewFileStoreFactory returns a file-based implementation of MessageStoreFactory
Expand All @@ -53,15 +54,24 @@ func (f fileStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, er
if err != nil {
return nil, err
}
return newFileStore(sessionID, dirname)
var fsync bool
if sessionSettings.HasSetting(config.FileStoreSync) {
fsync, err = sessionSettings.BoolSetting(config.FileStoreSync)
if err != nil {
return nil, err
}
} else {
fsync = true //existing behavior is to fsync writes
}
return newFileStore(sessionID, dirname, fsync)
}

func newFileStore(sessionID SessionID, dirname string) (*fileStore, error) {
func newFileStore(sessionID SessionID, dirname string, fileSync bool) (*fileStore, error) {
if err := os.MkdirAll(dirname, os.ModePerm); err != nil {
return nil, err
}

sessionPrefix := sessionIDFilenamePrefix(sessionID)
sessionPrefix := SessionIDFilenamePrefix(sessionID)

store := &fileStore{
sessionID: sessionID,
Expand All @@ -72,6 +82,7 @@ func newFileStore(sessionID SessionID, dirname string) (*fileStore, error) {
sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")),
senderSeqNumsFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "senderseqnums")),
targetSeqNumsFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "targetseqnums")),
fileSync: fileSync,
}

if err := store.Refresh(); err != nil {
Expand Down Expand Up @@ -208,8 +219,10 @@ func (store *fileStore) setSession() error {
if _, err := store.sessionFile.Write(data); err != nil {
return fmt.Errorf("unable to write to file: %s: %s", store.sessionFname, err.Error())
}
if err := store.sessionFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.sessionFname, err.Error())
if store.fileSync {
if err := store.sessionFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.sessionFname, err.Error())
}
}
return nil
}
Expand All @@ -221,8 +234,10 @@ func (store *fileStore) setSeqNum(f *os.File, seqNum int) error {
if _, err := fmt.Fprintf(f, "%019d", seqNum); err != nil {
return fmt.Errorf("unable to write to file: %s: %s", f.Name(), err.Error())
}
if err := f.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", f.Name(), err.Error())
if store.fileSync {
if err := f.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", f.Name(), err.Error())
}
}
return nil
}
Expand Down Expand Up @@ -291,11 +306,13 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
if _, err := store.bodyFile.Write(msg); err != nil {
return fmt.Errorf("unable to write to file: %s: %s", store.bodyFname, err.Error())
}
if err := store.bodyFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
}
if err := store.headerFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
if store.fileSync {
if err := store.bodyFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
}
if err := store.headerFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
)

func sessionIDFilenamePrefix(s SessionID) string {
func SessionIDFilenamePrefix(s SessionID) string {
sender := []string{s.SenderCompID}
if s.SenderSubID != "" {
sender = append(sender, s.SenderSubID)
Expand Down
4 changes: 2 additions & 2 deletions fileutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSessionIDFilename_MinimallyQualifiedSessionID(t *testing.T) {
sessionID := SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"}

// Then the filename should be
require.Equal(t, "FIX.4.4-SENDER-TARGET", sessionIDFilenamePrefix(sessionID))
require.Equal(t, "FIX.4.4-SENDER-TARGET", SessionIDFilenamePrefix(sessionID))
}

func TestSessionIDFilename_FullyQualifiedSessionID(t *testing.T) {
Expand All @@ -42,7 +42,7 @@ func TestSessionIDFilename_FullyQualifiedSessionID(t *testing.T) {
}

// Then the filename should be
require.Equal(t, "FIX.4.4-A_B_C-D_E_F-G", sessionIDFilenamePrefix(sessionID))
require.Equal(t, "FIX.4.4-A_B_C-D_E_F-G", SessionIDFilenamePrefix(sessionID))
}

func TestOpenOrCreateFile(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion nuts_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewNutsDbStoreFactory(db *nutsdb.DB) MessageStoreFactory {
}

func (f nutsDbStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, err error) {
sessionPrefix := sessionIDFilenamePrefix(sessionID)
sessionPrefix := SessionIDFilenamePrefix(sessionID)
store := &nutsDbStore{
db: f.db,
cache: &memoryStore{},
Expand Down