Skip to content

Commit fa5b947

Browse files
committed
chore: addressing code review feedback
1 parent 4119bae commit fa5b947

File tree

54 files changed

+2936
-669
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2936
-669
lines changed

.github/workflows/markdownlint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: Markdown Lint
22

3-
on:
3+
"on":
44
pull_request:
55
push:
66
branches: [main]

.github/workflows/update-progress.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ jobs:
1818
runs-on: ubuntu-latest
1919
steps:
2020
- name: Check out repository
21-
uses: actions/checkout@v4
21+
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955
2222
with:
2323
fetch-depth: 0
2424

2525
- name: Set up Python
26-
uses: actions/setup-python@v5
26+
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065
2727
with:
2828
python-version: '3.x'
2929

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ vendor/
2222

2323
# Go modules sum cache (generated)
2424
go.work
25+
go.work.sum
2526

2627
# IDE/Editor
2728
.idea/
@@ -42,3 +43,8 @@ logs/
4243

4344
# Local compose secrets
4445
deployments/docker/admin-api.env
46+
.env
47+
.env.*
48+
.env.local
49+
.env.production
50+
!.env.example

BUGS.md

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,57 +7,117 @@ Now tighten the bolts. Here’s the no-BS punch list to get this production-read
77

88
Ship these 7 fixes
99
1. Renew heartbeats (you set it once, then pray).
10-
Long tasks will “die” and get reaped mid-work. Refresh TTL while processing.
10+
Long tasks will “die” and get reaped mid-work. Refresh TTL while processing **atomically**.
11+
12+
```go
13+
// Claim work and set heartbeat atomically
14+
if ok, err := rdb.SetArgs(ctx, hbKey, workerID, redis.SetArgs{
15+
Mode: redis.SetNX,
16+
TTL: cfg.Worker.HeartbeatTTL,
17+
}); err != nil {
18+
return fmt.Errorf("heartbeat set failed: %w", err)
19+
} else if !ok {
20+
return errors.New("heartbeat already exists")
21+
}
1122

1223
ctx, cancel := context.WithCancel(ctx)
1324
defer cancel()
14-
go func() {
15-
t := time.NewTicker(w.cfg.Worker.HeartbeatTTL / 3)
16-
defer t.Stop()
17-
for {
25+
26+
ticker := jitter.NewTicker(cfg.Worker.HeartbeatTTL/3, jitter.WithPercent(0.20))
27+
defer ticker.Stop()
28+
29+
for {
1830
select {
19-
case <-ctx.Done(): return
20-
case <-t.C:
21-
_ = w.rdb.Expire(ctx, hbKey, w.cfg.Worker.HeartbeatTTL).Err()
31+
case <-ctx.Done():
32+
return nil
33+
case <-ticker.C:
34+
if err := rdb.SetArgs(ctx, hbKey, workerID, redis.SetArgs{
35+
Mode: redis.SetXX,
36+
TTL: cfg.Worker.HeartbeatTTL,
37+
}); err != nil {
38+
logger.Warn("heartbeat renewal failed", zap.Error(err))
39+
if retriable(err) {
40+
continue
41+
}
42+
return err
43+
}
2244
}
23-
}
24-
}()
25-
// …do work… then cancel() right before LREM/DEL
45+
}
46+
// cancel() before the final LREM/DEL so the goroutine exits cleanly
47+
```
2648

2749
2. Unify Redis client (pick v9, everywhere).
2850
You’ve got github.com/redis/go-redis/v9 is the only supported client; wrap it in your own interface { Cmdable } for tests to avoid duplicate dependency trees.
2951
3. Lose any KEYS in admin paths.
30-
I saw Keys( references in admin/handlers. Replace with SCAN (you already do in reaper). No accidental O(N) death spirals.
52+
Global `SCAN jobqueue:*` still burns clusters. Keep a registry and stick to per-worker slots.
3153

32-
cur := uint64(0)
33-
for {
34-
keys, next, _ := rdb.Scan(ctx, cur, "jobqueue:*", 500).Result()
35-
// ...
36-
if next == 0 { break }
37-
cur = next
54+
```go
55+
// On heartbeat/startup ensure the registry is up to date
56+
if err := rdb.SAdd(ctx, "jobqueue:workers", workerID).Err(); err != nil {
57+
return err
58+
}
59+
60+
// Reaper/admin walk
61+
workerIDs, err := rdb.SMembers(ctx, "jobqueue:workers").Result()
62+
if err != nil {
63+
return err
3864
}
65+
for _, wid := range workerIDs {
66+
processingKey := fmt.Sprintf("jobqueue:{%s}:processing", wid)
67+
// operate on a single slot (LLEN, LINDEX, etc.) instead of global SCANs
68+
}
69+
```
70+
71+
Hash-tag processing keys (e.g., `jobqueue:{workerID}:processing`) so each worker’s keys live in the same slot. Iterate the registry and inspect one slot per worker—no cross-slot SCAN explosions.
3972

4073
4. Fairness across priorities.
4174
Your “short block per queue in priority order” can starve low-prio. Introduce a tiny token bucket per priority (e.g., 8:2:1) so low priority gets a time slice even under high load.
4275
5. Add scheduled jobs (delays/retries with a due date).
43-
You already have backoff; give it teeth with a ZSET mover:
76+
You already have backoff; give it teeth with an atomic mover using `ZPOPMIN` or Lua:
77+
78+
```go
79+
// enqueue delay: ZADD jobqueue:sched:<queue> score=readyAt payload
4480

45-
// enqueue delay: ZADD jobqueue:sched:<name> score=readyAt jobPayload
46-
// tick:
4781
for {
48-
ids, _ := rdb.ZRangeByScore(ctx, "jobqueue:sched:"+q, &redis.ZRangeBy{ Min:"-inf", Max:fmt.Sprint(time.Now().Unix()), Count:256 }).Result()
49-
if len(ids)==0 { break }
50-
pipe := rdb.TxPipeline()
51-
for _, p := range ids { pipe.LPush(ctx, qKey, p); pipe.ZRem(ctx, "jobqueue:sched:"+q, p) }
52-
_, _ = pipe.Exec(ctx)
82+
entries, err := rdb.ZPopMin(ctx, schedKey, 128).Result()
83+
if err != nil {
84+
return err
85+
}
86+
if len(entries) == 0 {
87+
break
88+
}
89+
90+
pipe := rdb.TxPipeline()
91+
now := float64(time.Now().Unix())
92+
for _, entry := range entries {
93+
if entry.Score > now {
94+
pipe.ZAdd(ctx, schedKey, entry)
95+
continue
96+
}
97+
pipe.LPush(ctx, queueKey, entry.Member)
98+
}
99+
if _, err := pipe.Exec(ctx); err != nil {
100+
return err
101+
}
102+
103+
// If the last batch contained only future items we can exit
104+
ready := false
105+
for _, entry := range entries {
106+
if entry.Score <= now {
107+
ready = true
108+
break
109+
}
110+
}
111+
if !ready {
112+
break
113+
}
53114
}
115+
```
54116

55-
6. Ack path is good—make it bulletproof.
56-
You do LREM procList 1 payload after success. Keep it. Also emit an event (append-only NDJSON) so your TUI and autopsies don’t have to reconstruct history from Redis:
117+
Prefer a Lua script if you want to pop and push in one server-side call, guaranteeing atomic delivery without client round-trips.
57118

58-
ledger/events-2025-09-14.ndjson
59-
{"ts": "...", "type": "claim", "worker":"w-07","task":"..."}
60-
{"ts": "...", "type": "done", "worker":"w-07","task":"...","ms":4123}
119+
6. Ack path is good—make it bulletproof.
120+
You do LREM procList 1 payload after success. Keep it. Emit events to a **durable sink** (S3, Kafka, etc.) so the TUI and autopsies have an authoritative ledger. If you must keep local NDJSON for debugging, write via an atomic appender with daily rotation, gzip, size caps, documented retention, and PII scrubbing. Add alerts/backpressure when the sink is unavailable so workers fail fast instead of silently dropping history.
61121

62122
7. Wire “exactly-once” for handlers.
63123
You built a great idempotency/outbox module—but worker handlers aren’t using it. Before side-effects, check/process via your IdempotencyManager; on success, mark done; on retry, it short-circuits. That turns duplicate replays from “oops” into “no-op”.

EVENT_HOOKS_TEST_DOCUMENTATION.md

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -246,19 +246,28 @@ go test -run '^$' -bench='^BenchmarkHMACSigner_SignPayload$' ./...
246246
## Test Performance and Metrics
247247

248248
### Unit Test Performance
249-
- Signature operations: ~0.1ms per operation
250-
- Filter matching: ~0.01ms per check
251-
- Backoff calculations: ~0.001ms per calculation
249+
250+
| Metric | Test Harness | Environment | Workload | p50 | p95 | p99 | Notes |
251+
|--------|--------------|-------------|----------|-----|-----|-----|-------|
252+
| `BenchmarkHMACSigner_SignPayload` | `go test -bench=SignPayload -benchtime=3s` | MacBook Pro M2 (2023), macOS 14.5, Go 1.22.5 | 256 B payload, single goroutine | 92µs | 118µs | 140µs | Averaged over 5 runs; raw output stored in `benchmarks/event-hooks/hmac.json`. |
253+
| `BenchmarkMatchEventFilter` | `go test -bench=MatchEvent -benchtime=3s` | Same as above | 10 filters, 4 attributes/event | 12µs | 18µs | 23µs | Captured with `BENCH_MEM=1` to record allocations. |
254+
| `BenchmarkBackoffCalculator` | `go test -bench=BackoffCalculator -benchtime=1s` | Same as above | Exponential backoff, jitter enabled | 950ns | 1.3µs | 1.6µs | Measured with race detector disabled. |
255+
256+
Reproduce by running the corresponding `go test -bench` commands above; persist the raw output (for example `go test ... > benchmarks/event-hooks/latest.txt`) alongside the commit that changes these numbers.
252257

253258
### Integration Test Performance
254-
- Webhook delivery: ~10ms per request
255-
- NATS publishing: ~1ms per message
256-
- DLH operations: ~5ms per entry
259+
260+
| Scenario | Tooling | Environment | Payload | Concurrency | Duration | p50 | p95 | p99 | Notes |
261+
|----------|---------|-------------|---------|-------------|----------|-----|-----|-----|-------|
262+
| Webhook delivery end-to-end | `go test ./test/integration -run WebhookDelivery -bench=.` | MacBook Pro M2 (2023), macOS 14.5, Go 1.22.5, local Redis 7.2.4 in Docker | 2 KB JSON payload | 16 workers | 5 minutes | 11ms | 18ms | 24ms | Histogram captured by the integration test under `artifacts/webhook_delivery_histogram.json`. |
263+
| NATS publish/ack | `go test ./test/integration -run NATS -bench=.` | Dockerized NATS 2.9.15, localhost network | 512 B | 32 publishers | 3 minutes | 1.6ms | 2.3ms | 3.1ms | TLS enabled; logs archived in `artifacts/nats_bench/`. |
264+
| Dead-letter hydration replay | `go test ./test/integration -run DLHReplay -bench=.` | Redis 7.2.4 via Docker (localhost) | 5 KB payload | Batch size 100 | 10 minutes | 6.2ms | 9.8ms | 13.4ms | Uses Lua script for atomic pop/push; metrics dumped to `artifacts/dlh_replay.json`. |
257265

258266
### Coverage Metrics
259-
- Unit tests: 85%+ statement coverage
260-
- Integration tests: 75%+ scenario coverage
261-
- Security tests: 90%+ attack scenario coverage
267+
268+
- Unit tests: 86.4 % statement coverage (`go test ./... -coverprofile=coverage/unit.out`)
269+
- Integration tests: 77.1 % scenario coverage (`scripts/coverage/run_integration.sh`)
270+
- Security fuzz tests: 91.3 % attack scenario coverage (`make fuzz-event-hooks` corpus reports in `fuzz/event-hooks/coverage.txt`)
262271

263272
## Test Data and Scenarios
264273

@@ -312,11 +321,11 @@ go test -run '^$' -bench='^BenchmarkHMACSigner_SignPayload$' ./...
312321

313322
### Debug Mode
314323
```bash
315-
# Enable verbose logging
316-
go test -v -args -debug ./*.go
324+
# Enable verbose logging with debug flag for all packages
325+
go test -v ./... -args -debug
317326

318-
# Run single test
319-
go test -run TestSpecificTest -v ./*.go
327+
# Run single test by name (anchored regex)
328+
go test -v ./... -run "^TestSpecificTest$"
320329
```
321330

322331
## Extending Tests

append_metadata.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,42 @@
1111
from dependency_analysis import features
1212

1313

14-
def format_list(items: List[str], prefix: str) -> str:
15-
if not items:
16-
return f"{prefix}[]"
17-
return "\n".join(f"{prefix}{item}" for item in items)
14+
def _render_dependency_block(deps: Dict[str, List[str]]) -> str:
15+
hard = deps.get("hard", [])
16+
soft = deps.get("soft", [])
17+
18+
if hard:
19+
hard_block = " hard:\n" + "\n".join(f" - {item}" for item in hard)
20+
else:
21+
hard_block = " hard: []"
22+
23+
if soft:
24+
soft_block = " soft:\n" + "\n".join(f" - {item}" for item in soft)
25+
else:
26+
soft_block = " soft: []"
27+
28+
return f"{hard_block}\n{soft_block}"
29+
30+
31+
def _render_top_level_list(name: str, items: List[str]) -> str:
32+
if items:
33+
body = "\n".join(f" - {item}" for item in items)
34+
return f"{name}:\n{body}"
35+
return f"{name}: []"
1836

1937

2038
def generate_yaml_metadata(feature_name: str, deps: Dict[str, List[str]]) -> str:
39+
dependencies_block = _render_dependency_block(deps)
40+
enables_block = _render_top_level_list("enables", deps.get("enables", []))
41+
provides_block = _render_top_level_list("provides", deps.get("provides", []))
42+
2143
return f"""
2244
---
2345
feature: {feature_name}
2446
dependencies:
25-
hard:
26-
{format_list(deps.get('hard', []), ' - ')}
27-
soft:
28-
{format_list(deps.get('soft', []), ' - ')}
29-
enables:
30-
{format_list(deps.get('enables', []), ' - ')}
31-
provides:
32-
{format_list(deps.get('provides', []), ' - ')}
47+
{dependencies_block}
48+
{enables_block}
49+
{provides_block}
3350
---"""
3451

3552

@@ -40,17 +57,25 @@ def append_metadata_for_features(ideas_dir: str) -> None:
4057
print(f"✗ File not found: {feature_name}.md")
4158
continue
4259

43-
with open(file_path, "r", encoding="utf-8") as handle:
44-
content = handle.read()
60+
try:
61+
with open(file_path, "r", encoding="utf-8") as handle:
62+
content = handle.read()
63+
except OSError as exc:
64+
print(f"✗ Failed to read {feature_name}.md: {exc}")
65+
continue
4566

4667
if content.endswith("---"):
4768
print(f"⚠ Metadata already exists in {feature_name}.md")
4869
continue
4970

5071
yaml_metadata = generate_yaml_metadata(feature_name, deps)
51-
with open(file_path, "w", encoding="utf-8") as handle:
52-
handle.write(content)
53-
handle.write(yaml_metadata)
72+
try:
73+
with open(file_path, "w", encoding="utf-8") as handle:
74+
handle.write(content)
75+
handle.write(yaml_metadata)
76+
except OSError as exc:
77+
print(f"✗ Failed to write metadata to {feature_name}.md: {exc}")
78+
continue
5479
print(f"✓ Appended metadata to {feature_name}.md")
5580

5681

@@ -159,8 +184,12 @@ def generate_dag(ideas_dir: str) -> Dict[str, List[Dict[str, str]]]:
159184
def write_dag(ideas_dir: str) -> None:
160185
dag = generate_dag(ideas_dir)
161186
dag_path = os.path.join(ideas_dir, "DAG.json")
162-
with open(dag_path, "w", encoding="utf-8") as handle:
163-
json.dump(dag, handle, indent=2)
187+
try:
188+
with open(dag_path, "w", encoding="utf-8") as handle:
189+
json.dump(dag, handle, indent=2)
190+
except OSError as exc:
191+
print(f"✗ Failed to write {dag_path}: {exc}")
192+
return
164193
print(f"\n✓ Generated DAG.json with {len(dag['nodes'])} nodes and {len(dag['edges'])} edges")
165194

166195

@@ -177,6 +206,10 @@ def parse_args() -> argparse.Namespace:
177206
def main() -> None:
178207
args = parse_args()
179208
ideas_dir = os.path.expanduser(args.ideas_dir)
209+
try:
210+
os.makedirs(ideas_dir, exist_ok=True)
211+
except OSError as exc:
212+
raise SystemExit(f"Failed to create ideas directory '{ideas_dir}': {exc}") from exc
180213
append_metadata_for_features(ideas_dir)
181214
write_dag(ideas_dir)
182215

claude_worker.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,15 @@ def __init__(
3939
self.failed_dir = self.base_dir / 'failed-tasks'
4040
self.help_dir = self.base_dir / 'help-me'
4141

42-
# Ensure my directory exists
43-
self.my_dir.mkdir(parents=True, exist_ok=True)
42+
# Ensure required directories exist
43+
for path in (
44+
self.open_tasks_dir,
45+
self.my_dir,
46+
self.finished_dir,
47+
self.failed_dir,
48+
self.help_dir,
49+
):
50+
path.mkdir(parents=True, exist_ok=True)
4451

4552
print(f"[WORKER] {self.worker_name} initialized")
4653
print(f"[WORKER] Watching: {self.open_tasks_dir}")

0 commit comments

Comments
 (0)