Conversation
📝 WalkthroughWalkthroughTransforms testapp into a KV store rollup reference: adds stress-test tooling, HTTP /stats endpoint, executor metrics, updated defaults, and comprehensive README/CLI docs for running and benchmarking the KV server. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
|
Claude finished @julienrbrt's task in 1m 44s —— View job Code Review:
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3214 +/- ##
=======================================
Coverage 61.41% 61.41%
=======================================
Files 120 120
Lines 12474 12474
=======================================
Hits 7661 7661
Misses 3953 3953
Partials 860 860
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (1)
apps/testapp/kv/kvexecutor.go (1)
45-54: UnexportExecutorStatsandGetStats()since they're package-private.
ExecutorStatsandGetStats()are only consumed internally inhttp_server.gowithin the same package. Unexporting them toexecutorStatsandgetStats()keeps the API surface minimal. If they need to remain exported, they require Go doc comments per the coding guidelines.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/testapp/kv/kvexecutor.go` around lines 45 - 54, Rename the exported types and methods to unexported identifiers: change ExecutorStats to executorStats and GetStats() to getStats() in KVExecutor, update the return type and constructor usages accordingly, and update all internal references (e.g., in http_server.go) to use executorStats and getStats(); ensure method receiver signature remains the same (func (k *KVExecutor) getStats() executorStats) and run `go build`/tests to catch any missed references.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/testapp/cmd/init.go`:
- Line 38: The init command currently unconditionally sets cfg.Node.BlockTime =
rollconf.DurationWrapper{Duration: 100 * time.Millisecond}, overwriting any
value rollconf.Load(cmd) parsed (e.g., --evnode.node.block_time=1s); change this
so you only apply 100ms when the user did not supply the flag or instead set the
flag's default to 100ms. Locate the init command logic around rollconf.Load(cmd)
and cfg.Node.BlockTime and either (a) detect whether the flag was provided (use
the command/flag API to check if evnode.node.block_time was set) and only assign
the 100ms default when not provided, or (b) set the CLI flag default for
evnode.node.block_time to 100ms and remove the unconditional assignment to
cfg.Node.BlockTime.
In `@apps/testapp/kv/bench/main.go`:
- Around line 173-181: The fetchStats function currently swallows HTTP/JSON
errors and returns an all-zero serverStats which can cause underflow when
callers subtract uint64 deltas; change fetchStats to return (serverStats, error)
instead of serverStats, propagate errors from http.Get and json.Decode
(including non-200 responses), and have callers check the error and skip
server-side delta calculations/reporting when a snapshot cannot be trusted (use
the presence of an error to avoid subtracting snapshots or to re-sample). Ensure
you update the signature and all call sites accordingly (references: fetchStats,
serverStats).
- Line 28: The flag targetRPS (variable targetRPS) is only reported and not
enforced; either implement pacing or rename it to avoid confusion — to implement
pacing, add a token-bucket or ticker driven rate limiter in the request send
path (e.g., inside the send loop function used to launch workers / sendRequests)
that consumes tokens at the rate specified by *targetRPS and blocks or sleeps
when tokens are exhausted, or alternatively rename the flag (and all uses in
banner/logging) to something like target-rps-goal or desired-rps to make it
clear it’s only informational; update the worker launch/send loop (the function
that iterates and issues requests referenced at lines 56-57 and 103-108) to
either use the limiter or to remove/rename the misleading references so behavior
matches the flag name.
- Around line 121-149: The worker currently uses net.DialTimeout and then
performs conn.Write and http.ReadResponse without timeouts, which can block
forever; replace the dial with a context-aware dial (use net.Dialer.DialContext
with ctx) and before each network operation set per-operation deadlines on conn
(via conn.SetDeadline(time.Now().Add(timeout))) so the conn.Write(rawReq) and
http.ReadResponse(br, nil) calls will time out and return errors that you handle
by calling failures.Add(1), closing conn, and breaking the loop; ensure the
ctx.Done() check remains and choose a sensible timeout value (e.g., a short
per-op timeout) so workers can exit promptly on server stalls or context
cancellation.
In `@apps/testapp/kv/bench/README.md`:
- Around line 16-20: The README uses an incorrect startup command: it shows
./build/testapp but the build artifacts are ./stress-test and ./testapp; update
the Terminal 1 command to start the built testapp binary (use ./testapp start
--kv-endpoint localhost:9090) so it matches the documented build output and the
Terminal 2 stress-test command; ensure the symbols shown (./build/testapp,
./testapp, ./stress-test, --kv-endpoint) are updated consistently in the README
entry.
In `@apps/testapp/kv/http_server.go`:
- Line 112: The injectedTxs counter is incremented unconditionally at
hs.injectedTxs.Add(1) even when KVExecutor.InjectTx drops transactions; change
the flow so InjectTx (in apps/testapp/kv/kvexecutor.go) returns a success
boolean or error indicating whether the tx was enqueued, then in the HTTP
handler only call hs.injectedTxs.Add(1) and return 202 when InjectTx reports
success—otherwise return an error status (e.g., 429/500) and do not increment
injectedTxs so dropped transactions are not counted.
In `@apps/testapp/kv/kvexecutor.go`:
- Line 25: The txChannelBufferSize constant is far too large and will
preallocate gigabytes when used in make(chan []byte, txChannelBufferSize) inside
NewKVExecutor; change txChannelBufferSize to a much smaller default (e.g., 1_000
or similar) and make it configurable (via constructor param or config/env) so
callers can tune it, or replace the large buffered channel with a bounded/ring
buffer implementation or an unbuffered channel plus worker backpressure; update
NewKVExecutor to accept the buffer size parameter (or config) and use it instead
of the hardcoded 100_000_000, and add a comment about memory implications.
---
Nitpick comments:
In `@apps/testapp/kv/kvexecutor.go`:
- Around line 45-54: Rename the exported types and methods to unexported
identifiers: change ExecutorStats to executorStats and GetStats() to getStats()
in KVExecutor, update the return type and constructor usages accordingly, and
update all internal references (e.g., in http_server.go) to use executorStats
and getStats(); ensure method receiver signature remains the same (func (k
*KVExecutor) getStats() executorStats) and run `go build`/tests to catch any
missed references.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5360d9f5-19ba-4c64-bdd9-9121413b1041
📒 Files selected for processing (8)
apps/testapp/README.mdapps/testapp/cmd/init.goapps/testapp/examples/passphrase.txtapps/testapp/kv/bench/README.mdapps/testapp/kv/bench/main.goapps/testapp/kv/http_server.goapps/testapp/kv/http_server_test.goapps/testapp/kv/kvexecutor.go
| // we use load in order to parse all the flags | ||
| cfg, _ := rollconf.Load(cmd) | ||
| cfg.Node.Aggregator = aggregator | ||
| cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 100 * time.Millisecond} |
There was a problem hiding this comment.
Don't overwrite the parsed block-time value.
Line 38 clobbers whatever rollconf.Load(cmd) just read, so testapp init --evnode.node.block_time=1s still writes 100ms. If the goal is a faster default for this command, set the flag default or only apply 100ms when the user did not pass the flag; otherwise this also drifts from apps/testapp/cmd/init_test.go, which still expects block_time: 1s.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/cmd/init.go` at line 38, The init command currently
unconditionally sets cfg.Node.BlockTime = rollconf.DurationWrapper{Duration: 100
* time.Millisecond}, overwriting any value rollconf.Load(cmd) parsed (e.g.,
--evnode.node.block_time=1s); change this so you only apply 100ms when the user
did not supply the flag or instead set the flag's default to 100ms. Locate the
init command logic around rollconf.Load(cmd) and cfg.Node.BlockTime and either
(a) detect whether the flag was provided (use the command/flag API to check if
evnode.node.block_time was set) and only assign the 100ms default when not
provided, or (b) set the CLI flag default for evnode.node.block_time to 100ms
and remove the unconditional assignment to cfg.Node.BlockTime.
| addr := flag.String("addr", "localhost:9090", "server host:port") | ||
| duration := flag.Duration("duration", 10*time.Second, "test duration") | ||
| workers := flag.Int("workers", 1000, "concurrent workers") | ||
| targetRPS := flag.Uint64("target-rps", 10_000_000, "target requests per second") |
There was a problem hiding this comment.
-target-rps is only reported, not enforced.
The new flag looks like a rate-control knob, but nothing in the worker launch or send loop uses it to pace requests. Right now it only changes the pass/fail banner, so runs with -target-rps=100 and -target-rps=10000000 generate the same load. Either add pacing/token-bucket logic or rename the flag so it is clearly just a goal.
Also applies to: 56-57, 103-108
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/bench/main.go` at line 28, The flag targetRPS (variable
targetRPS) is only reported and not enforced; either implement pacing or rename
it to avoid confusion — to implement pacing, add a token-bucket or ticker driven
rate limiter in the request send path (e.g., inside the send loop function used
to launch workers / sendRequests) that consumes tokens at the rate specified by
*targetRPS and blocks or sleeps when tokens are exhausted, or alternatively
rename the flag (and all uses in banner/logging) to something like
target-rps-goal or desired-rps to make it clear it’s only informational; update
the worker launch/send loop (the function that iterates and issues requests
referenced at lines 56-57 and 103-108) to either use the limiter or to
remove/rename the misleading references so behavior matches the flag name.
| conn, err := net.DialTimeout("tcp", addr, time.Second) | ||
| if err != nil { | ||
| failures.Add(1) | ||
| continue | ||
| } | ||
|
|
||
| br := bufio.NewReaderSize(conn, 512) | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| conn.Close() | ||
| return | ||
| default: | ||
| } | ||
|
|
||
| return | ||
| if _, err := conn.Write(rawReq); err != nil { | ||
| failures.Add(1) | ||
| conn.Close() | ||
| break | ||
| } | ||
|
|
||
| case <-ticker.C: | ||
| // Send a batch of transactions | ||
| for i := 0; i < txPerInterval; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| var currentTxData string | ||
| // Generate random key-value pair | ||
| key := randomString(8) | ||
| value := randomString(16) | ||
| currentTxData = fmt.Sprintf("%s=%s", key, value) | ||
| success := sendTransaction(url, currentTxData) | ||
| if success { | ||
| atomic.AddUint64(&successCount, 1) | ||
| } else { | ||
| atomic.AddUint64(&failureCount, 1) | ||
| } | ||
| }() | ||
| resp, err := http.ReadResponse(br, nil) | ||
| if err != nil { | ||
| failures.Add(1) | ||
| conn.Close() | ||
| break | ||
| } | ||
| io.Copy(io.Discard, resp.Body) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n apps/testapp/kv/bench/main.go | head -200Repository: evstack/ev-node
Length of output: 5905
Set deadlines on TCP operations to prevent workers from blocking indefinitely.
Line 121 only limits the connection establishment time. After that, conn.Write and http.ReadResponse have no timeouts and can block forever. If the server is slow or stalled, workers get stuck in these operations, never checking ctx.Done(), and the main goroutine hangs waiting for them to drain the done channel.
Use DialContext to respect context cancellation during connection, and set per-operation deadlines with SetDeadline so Write and ReadResponse calls are bounded:
Suggested direction
- conn, err := net.DialTimeout("tcp", addr, time.Second)
+ d := net.Dialer{Timeout: time.Second}
+ conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
failures.Add(1)
continue
}
@@
- if _, err := conn.Write(rawReq); err != nil {
+ if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
+ failures.Add(1)
+ conn.Close()
+ break
+ }
+ if _, err := conn.Write(rawReq); err != nil {
failures.Add(1)
conn.Close()
break
}
resp, err := http.ReadResponse(br, nil)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| conn, err := net.DialTimeout("tcp", addr, time.Second) | |
| if err != nil { | |
| failures.Add(1) | |
| continue | |
| } | |
| br := bufio.NewReaderSize(conn, 512) | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| conn.Close() | |
| return | |
| default: | |
| } | |
| return | |
| if _, err := conn.Write(rawReq); err != nil { | |
| failures.Add(1) | |
| conn.Close() | |
| break | |
| } | |
| case <-ticker.C: | |
| // Send a batch of transactions | |
| for i := 0; i < txPerInterval; i++ { | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| var currentTxData string | |
| // Generate random key-value pair | |
| key := randomString(8) | |
| value := randomString(16) | |
| currentTxData = fmt.Sprintf("%s=%s", key, value) | |
| success := sendTransaction(url, currentTxData) | |
| if success { | |
| atomic.AddUint64(&successCount, 1) | |
| } else { | |
| atomic.AddUint64(&failureCount, 1) | |
| } | |
| }() | |
| resp, err := http.ReadResponse(br, nil) | |
| if err != nil { | |
| failures.Add(1) | |
| conn.Close() | |
| break | |
| } | |
| io.Copy(io.Discard, resp.Body) | |
| d := net.Dialer{Timeout: time.Second} | |
| conn, err := d.DialContext(ctx, "tcp", addr) | |
| if err != nil { | |
| failures.Add(1) | |
| continue | |
| } | |
| br := bufio.NewReaderSize(conn, 512) | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| conn.Close() | |
| return | |
| default: | |
| } | |
| if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil { | |
| failures.Add(1) | |
| conn.Close() | |
| break | |
| } | |
| if _, err := conn.Write(rawReq); err != nil { | |
| failures.Add(1) | |
| conn.Close() | |
| break | |
| } | |
| resp, err := http.ReadResponse(br, nil) | |
| if err != nil { | |
| failures.Add(1) | |
| conn.Close() | |
| break | |
| } | |
| io.Copy(io.Discard, resp.Body) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/bench/main.go` around lines 121 - 149, The worker currently
uses net.DialTimeout and then performs conn.Write and http.ReadResponse without
timeouts, which can block forever; replace the dial with a context-aware dial
(use net.Dialer.DialContext with ctx) and before each network operation set
per-operation deadlines on conn (via conn.SetDeadline(time.Now().Add(timeout)))
so the conn.Write(rawReq) and http.ReadResponse(br, nil) calls will time out and
return errors that you handle by calling failures.Add(1), closing conn, and
breaking the loop; ensure the ctx.Done() check remains and choose a sensible
timeout value (e.g., a short per-op timeout) so workers can exit promptly on
server stalls or context cancellation.
| func fetchStats(addr string) serverStats { | ||
| resp, err := http.Get("http://" + addr + "/stats") | ||
| if err != nil { | ||
| return serverStats{} | ||
| } | ||
| defer resp.Body.Close() | ||
| var s serverStats | ||
| json.NewDecoder(resp.Body).Decode(&s) | ||
| return s |
There was a problem hiding this comment.
Don't collapse /stats failures into zero counters.
Line 176 turns any fetch failure into an all-zero snapshot, and the caller later subtracts those snapshots as uint64. If the initial sample succeeds and the final sample fails, or the server restarts between samples, the delta math underflows and the summary prints enormous bogus block/tx counts. Return an error from fetchStats and skip server-side delta reporting when a snapshot cannot be trusted.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/bench/main.go` around lines 173 - 181, The fetchStats
function currently swallows HTTP/JSON errors and returns an all-zero serverStats
which can cause underflow when callers subtract uint64 deltas; change fetchStats
to return (serverStats, error) instead of serverStats, propagate errors from
http.Get and json.Decode (including non-200 responses), and have callers check
the error and skip server-side delta calculations/reporting when a snapshot
cannot be trusted (use the presence of an error to avoid subtracting snapshots
or to re-sample). Ensure you update the signature and all call sites accordingly
(references: fetchStats, serverStats).
| # Terminal 1: start the testapp with KV endpoint | ||
| ./build/testapp start --kv-endpoint localhost:9090 | ||
|
|
||
| ### List all key-value pairs in the store | ||
|
|
||
| ```bash | ||
| ./txclient -list [-addr <url>] | ||
| # Terminal 2: run the stress test | ||
| ./stress-test --addr localhost:9090 --duration 10s --workers 1000 |
There was a problem hiding this comment.
The testapp startup command doesn't match the documented build output.
Line 17 runs ./build/testapp, but the documented build commands produce ./stress-test here and ./testapp in apps/testapp/README.md. Following the steps as written will not create ./build/testapp.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/bench/README.md` around lines 16 - 20, The README uses an
incorrect startup command: it shows ./build/testapp but the build artifacts are
./stress-test and ./testapp; update the Terminal 1 command to start the built
testapp binary (use ./testapp start --kv-endpoint localhost:9090) so it matches
the documented build output and the Terminal 2 stress-test command; ensure the
symbols shown (./build/testapp, ./testapp, ./stress-test, --kv-endpoint) are
updated consistently in the README entry.
| } | ||
|
|
||
| hs.executor.InjectTx(body) | ||
| hs.injectedTxs.Add(1) |
There was a problem hiding this comment.
Don't count dropped transactions as injected.
KVExecutor.InjectTx in apps/testapp/kv/kvexecutor.go still drops when txChan is full, but Line 112 increments injectedTxs unconditionally and Line 113 still returns 202. Under load, /stats will over-report injections and callers will see success for work that was discarded. Have InjectTx return a bool/error and only increment/respond 202 on a successful enqueue.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/http_server.go` at line 112, The injectedTxs counter is
incremented unconditionally at hs.injectedTxs.Add(1) even when
KVExecutor.InjectTx drops transactions; change the flow so InjectTx (in
apps/testapp/kv/kvexecutor.go) returns a success boolean or error indicating
whether the tx was enqueued, then in the HTTP handler only call
hs.injectedTxs.Add(1) and return 202 when InjectTx reports success—otherwise
return an error status (e.g., 429/500) and do not increment injectedTxs so
dropped transactions are not counted.
| finalizedHeightKey = ds.NewKey("/finalizedHeight") | ||
| // Define a buffer size for the transaction channel | ||
| txChannelBufferSize = 10000 | ||
| txChannelBufferSize = 100_000_000 |
There was a problem hiding this comment.
This mempool size will allocate gigabytes at startup.
Line 25 drives make(chan []byte, txChannelBufferSize) in NewKVExecutor. With 100_000_000 slots, a buffered chan []byte needs roughly 24 bytes per slot just for slice headers on 64-bit Go, so the queue alone is already well over 2 GB before any payload bytes are stored. That is likely to OOM the process or starve everything else of memory. As per coding guidelines, "Consider memory allocation in hot paths of Go code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/testapp/kv/kvexecutor.go` at line 25, The txChannelBufferSize constant
is far too large and will preallocate gigabytes when used in make(chan []byte,
txChannelBufferSize) inside NewKVExecutor; change txChannelBufferSize to a much
smaller default (e.g., 1_000 or similar) and make it configurable (via
constructor param or config/env) so callers can tune it, or replace the large
buffered channel with a bounded/ring buffer implementation or an unbuffered
channel plus worker backpressure; update NewKVExecutor to accept the buffer size
parameter (or config) and use it instead of the hardcoded 100_000_000, and add a
comment about memory implications.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/testapp/cmd/run.go (1)
140-149:⚠️ Potential issue | 🟠 MajorAvoid hardcoding a 1,000,000 sequencer queue cap.
Line 146 sets an extremely high
maxQueueSizewith no local validation/clamping. GivenBatchQueuegrows viaappendandPrependbypasses the cap, this can cause severe memory pressure/OOM during stress or recovery paths. Please make this value configurable and enforce a bounded limit before passing it tosingle.NewSequencer.Suggested direction
- sequencer, err := single.NewSequencer( + // TODO: source from config/flag with validation instead of hardcoding. + maxQueueSize := 10_000 + sequencer, err := single.NewSequencer( logger, datastore, daClient, nodeConfig, []byte(genesis.ChainID), - 1_000_000, + maxQueueSize, genesis, executor, )As per coding guidelines, "Consider memory allocation in hot paths of Go code."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/testapp/cmd/run.go` around lines 140 - 149, The call to single.NewSequencer currently passes a hardcoded maxQueueSize (1_000_000) which risks unbounded memory use; make this value configurable (e.g., add a CLI flag or a field on nodeConfig such as MaxSequencerQueueSize) and validate/clamp it to a safe upper bound (e.g., MAX_ALLOWED_QUEUE_SIZE constant) before calling single.NewSequencer in run.go; ensure the clamping logic is applied where sequencer is constructed so both normal append and any Prepend/BatchQueue paths cannot exceed the enforced limit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@apps/testapp/cmd/run.go`:
- Around line 140-149: The call to single.NewSequencer currently passes a
hardcoded maxQueueSize (1_000_000) which risks unbounded memory use; make this
value configurable (e.g., add a CLI flag or a field on nodeConfig such as
MaxSequencerQueueSize) and validate/clamp it to a safe upper bound (e.g.,
MAX_ALLOWED_QUEUE_SIZE constant) before calling single.NewSequencer in run.go;
ensure the clamping logic is applied where sequencer is constructed so both
normal append and any Prepend/BatchQueue paths cannot exceed the enforced limit.
Improve the command to stress-test evnode via testapp.
Summary by CodeRabbit
New Features
Documentation