Skip to content
4 changes: 4 additions & 0 deletions p2p/stream/protocols/sync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
// MaxStreamFailures is the maximum allowed failures before stream gets removed
MaxStreamFailures = 5

// MaxRecoverableRetries is the maximum number of consecutive recoverable errors
// allowed in readMsgLoop before the stream is closed to prevent infinite loops
MaxRecoverableRetries = 5

// FaultRecoveryThreshold is the minimum duration before it resets the previous failures
// So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it
FaultRecoveryThreshold = 30 * time.Minute
Expand Down
100 changes: 79 additions & 21 deletions p2p/stream/protocols/sync/sync_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,80 @@ func (st *syncStream) run() {

// readMsgLoop is the loop
func (st *syncStream) readMsgLoop() {
recoverableErrorCount := 0

for {
select {
case <-st.closeC:
return
default:
msg, err := st.readMsg()
if err != nil {
if err := st.Close("read msg failed", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
// Classify error for logging purposes
errorType, errorDesc := sttypes.ClassifyStreamError(err)
// Use centralized error handling to determine if stream should be closed
shouldClose := sttypes.ShouldCloseStream(err)

// Log error with classification
if shouldClose {
// Record critical error metric
sttypes.RecordCriticalError(errorType)
st.logger.Warn().
Str("streamID", string(st.ID())).
Str("errorType", errorType.String()).
Str("description", errorDesc).
Bool("critical", true).
Msg("critical error, closing stream")
} else {
recoverableErrorCount++
// Record recoverable error metric
sttypes.RecordRecoverableError(errorType)
st.logger.Info().
Str("streamID", string(st.ID())).
Str("errorType", errorType.String()).
Str("description", errorDesc).
Bool("recoverable", true).
Int("consecutiveErrors", recoverableErrorCount).
Int("maxRetries", MaxRecoverableRetries).
Msg("recoverable error, continuing stream")
}
return

// Only close stream for errors that require closure
if shouldClose {
if err := st.Close("read msg failed", shouldClose); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}

// Check if we've exceeded max retries for recoverable errors
if recoverableErrorCount >= MaxRecoverableRetries {
// Record metric for stream closed due to too many recoverable errors
sttypes.RecordStreamClosedByRecoverableErrors()
st.logger.Warn().
Str("streamID", string(st.ID())).
Str("errorType", errorType.String()).
Str("description", errorDesc).
Int("consecutiveErrors", recoverableErrorCount).
Int("maxRetries", MaxRecoverableRetries).
Msg("too many consecutive recoverable errors, closing stream")
if err := st.Close("too many recoverable errors", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}

// Add exponential backoff for recoverable errors
backoffDuration := time.Duration(recoverableErrorCount) * 100 * time.Millisecond
time.Sleep(backoffDuration)

// For recoverable errors, continue the loop
continue
}

// Successfully read a message, reset recoverable error counter
recoverableErrorCount = 0

if msg == nil {
if err := st.Close("remote closed stream", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
Expand Down Expand Up @@ -134,12 +196,20 @@ func (st *syncStream) handleReqLoop() {
err := st.handleReq(req)

if err != nil {
st.logger.Info().Err(err).Str("request", req.String()).
Msg("handle request error. Closing stream")
if err := st.Close("handle request error", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
st.logger.Error().Err(err).Str("request", req.String()).
Msg("handle request by sync stream failed")
// Use the centralized error handling to determine if stream should be closed
if sttypes.ShouldCloseStream(err) {
// Classify and record critical error metric
errorType, _ := sttypes.ClassifyStreamError(err)
sttypes.RecordCriticalError(errorType)
st.logger.Error().Err(err).Str("request", req.String()).
Msg("sync stream critical error. Closing stream")
if err := st.Close("stream error", false); err != nil {
st.logger.Err(err).Msg("failed to close sync stream")
}
return
}
return
}

case <-st.closeC:
Expand Down Expand Up @@ -443,26 +513,14 @@ func (st *syncStream) readMsg() (*syncpb.Message, error) {
// Use progress-based reading with the tracker from BaseStream
b, err := st.ReadBytesWithProgress(st.GetProgressTracker())
if err != nil {
// Log progress timeout specifically
if err.Error() == "progress timeout" {
st.logger.Warn().
Str("streamID", string(st.ID())).
Msg("stream timeout due to lack of progress")
}
// Log stream closure specifically
if err.Error() == "stream closed" {
st.logger.Debug().
Str("streamID", string(st.ID())).
Msg("stream closed by remote peer")
}
return nil, err
}
if b == nil || len(b) == 0 {
// This should not happen
st.logger.Warn().
Str("streamID", string(st.ID())).
Msg("received empty message data")
return nil, errors.New("empty message data")
return nil, errors.Wrap(errors.New("empty message data"), "unexpected empty message")
}
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
Expand Down
Loading