feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
Conversation
…ollower and introduce DA client subscription.
…w mode When the DA subscription delivers blobs at the current local DA height, the followLoop now processes them inline via ProcessBlobs — avoiding a round-trip re-fetch from the DA layer. Architecture: - followLoop: processes subscription blobs inline when caught up (fast path), falls through to catchupLoop when behind (slow path). - catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync. Changes: - Add Blobs field to SubscriptionEvent for carrying raw blob data - Add extractBlobData() to DA client Subscribe adapter - Export ProcessBlobs on DARetriever interface - Add handleSubscriptionEvent() to DAFollower with inline fast path - Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now subscribes to both and merges events via a fan-in goroutine. This ensures inline blob processing works correctly for split-namespace configurations. Changes: - Add DataNamespace to DAFollowerConfig and daFollower - Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in - Guard handleSubscriptionEvent to only advance localDAHeight when ProcessBlobs returns at least one complete event (header+data matched) - Pass DataNamespace from syncer.go - Implement Subscribe on DummyDA test helper with subscriber notification
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
📝 WalkthroughWalkthroughAdds WebSocket-based DA subscriptions and a DAFollower that subscribes to DA events, drives inline blob processing via ProcessBlobs, and performs prioritized sequential catch‑up; replaces blob client constructors with NewWSClient and propagates Subscribe through DA client interfaces and mocks. Changes
Sequence DiagramsequenceDiagram
participant Syncer
participant DAFollower
participant DAClient as "DA Client (WS)"
participant Retriever
participant Pipeline as "Event Pipeline"
Syncer->>DAFollower: Start(ctx)
activate DAFollower
DAFollower->>DAClient: Subscribe(ctx, namespace)
DAClient-->>DAFollower: subscription channel (SubscriptionEvent)
loop on subscription event
DAFollower->>DAFollower: updateHighest(height)
alt inline (localNext == height)
DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
Retriever-->>DAFollower: []DAHeightEvent
DAFollower->>Pipeline: pipe events
else backlog
DAFollower->>DAFollower: signal catchup
end
end
loop catch-up sequence
DAFollower->>Retriever: RetrieveFromDA(ctx, height)
Retriever-->>DAFollower: blobs / Err
DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
Retriever-->>DAFollower: []DAHeightEvent
DAFollower->>Pipeline: pipe events
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @alpe's task in 3m 33s —— View job Code Review: Event-Driven DA Follower with WebSocket Subscriptions
SummaryThis is a well-architected change. Replacing the polling DA worker with an event-driven Critical1.
|
If no subscription events arrive within 3× the DA block time (default 30s), the watchdog triggers and returns an error. The followLoop then reconnects the subscription with the standard backoff. This prevents the node from silently stopping sync when the DA subscription stalls (e.g., network partition, DA node freeze).
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3131 +/- ##
==========================================
- Coverage 60.87% 60.14% -0.73%
==========================================
Files 113 114 +1
Lines 11695 11909 +214
==========================================
+ Hits 7119 7163 +44
- Misses 3773 3932 +159
- Partials 803 814 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…ient to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval.
There was a problem hiding this comment.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.
| Benchmark suite | Current: dd7e0cd | Previous: 2c75e9e | Ratio |
|---|---|---|---|
SpamoorSmoke - Batch.SetHeight (avg) |
15.020408163265307 us |
8.824561403508772 us |
1.70 |
SpamoorSmoke - Store.GetBlockData (avg) |
13.59433962264151 us |
7.780701754385965 us |
1.75 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (4)
block/internal/da/interface.go (1)
20-23: Clarify the post-cancellation drain contract in the comment.“Callers MUST drain the channel after cancellation” can be interpreted differently; consider explicitly stating the expected sequence (cancel context, keep receiving until channel closes).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/interface.go` around lines 20 - 23, Update the comment for Subscribe to explicitly describe the post-cancellation drain sequence: state that callers should cancel the provided ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent (e.g., range over the channel) until it is closed by the implementation; clarify that the channel will be closed once cancellation is observed and that draining (receiving until close) prevents goroutine/resource leaks. Reference the Subscribe(ctx context.Context, namespace []byte) signature and datypes.SubscriptionEvent in the comment.block/internal/syncing/da_retriever_tracing.go (1)
67-69: Consider instrumentingProcessBlobsin the tracing wrapper.This new hot path currently emits no span/attributes, which limits visibility into inline blob processing latency and output volume.
Suggested refactor
func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { - return t.inner.ProcessBlobs(ctx, blobs, daHeight) + ctx, span := t.tracer.Start(ctx, "DARetriever.ProcessBlobs", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + attribute.Int("blob.count", len(blobs)), + ), + ) + defer span.End() + + events := t.inner.ProcessBlobs(ctx, blobs, daHeight) + span.SetAttributes(attribute.Int("event.count", len(events))) + return events }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_retriever_tracing.go` around lines 67 - 69, The tracing wrapper tracedDARetriever.ProcessBlobs is not creating any span or attributes, so add instrumentation around the call to t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this file), set attributes for blob_count (len(blobs)), da_height (daHeight) and optionally total_blob_bytes (sum of len for each blob), call t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as an attribute (e.g., da_events_count) and any error/status if applicable, then end the span; keep the wrapper behavior identical except for adding the span and attributes to improve visibility into latency and output volume.apps/evm/server/force_inclusion_test.go (1)
53-58: Prefer a mockery-generated DA mock over extending the hand-written stub.This local
Subscribestub works, but it increases interface-drift maintenance as DA APIs evolve. Using the generated DA mock keeps contract changes centralized.As per coding guidelines, "Mock external dependencies using mockery".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/evm/server/force_inclusion_test.go` around lines 53 - 58, Replace the hand-written Subscribe implementation on mockDA with the mockery-generated DA mock: remove or stop using the local mockDA.Subscribe stub and instead import and instantiate the mockery-created mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan da.SubscriptionEvent) or desired channel for the test; update test setup where mockDA is constructed to use the mockery mock and set the Subscribe return behavior via the mock's EXPECT/On methods so the external DA contract is maintained centrally.block/internal/da/tracing.go (1)
148-150: Consider tracing subscription setup for consistency.
Subscribecurrently bypasses the tracing pattern used by the rest of this wrapper, so subscribe failures won’t be visible in DA spans.Suggested refactor
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { - return t.inner.Subscribe(ctx, namespace) + ctx, span := t.tracer.Start(ctx, "DA.Subscribe", + trace.WithAttributes( + attribute.Int("ns.length", len(namespace)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + ch, err := t.inner.Subscribe(ctx, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + return ch, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/da/tracing.go` around lines 148 - 150, tracedClient.Subscribe bypasses the wrapper's tracing; update the method to create a DA span (matching the tracing pattern used elsewhere in this file), start the span with the incoming ctx, add relevant attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any returned error on the span, and end the span before returning the channel and error so subscription setup failures are captured in DA spans; reference the tracedClient.Subscribe method and t.inner.Subscribe call when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS dial is using context.Background() when calling
blobrpc.NewWSClient which prevents CLI cancellation from propagating; replace
the background context with the command-scoped context (use cmd.Context() or a
derived context) when calling blobrpc.NewWSClient so that the DA WebSocket
handshake is canceled on CLI shutdown and respects timeouts; update the call
site where blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.
In `@block/internal/syncing/da_follower.go`:
- Around line 265-267: The unconditional f.localDAHeight.Store(ev.Height) can
regress localDAHeight if runCatchup advanced it concurrently; change the
rollback to a CAS so we only set it back when the current value equals the value
we expect to overwrite. Replace the Store(ev.Height) rollback with a
compare-and-swap using the atomic type/method (e.g.,
f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
- Around line 271-273: The code sets f.headReached.Store(true) whenever inline
processing yields events, which can be incorrect if highestSeenDAHeight is still
ahead; modify the condition around the len(events) branch in da_follower.go (the
block using ev.Height and f.headReached) to only set f.headReached when the
event height indicates we are actually caught up (e.g., ev.Height >=
f.highestSeenDAHeight or when highestSeenDAHeight is unset/zero), otherwise do
not flip headReached; update the conditional that logs with f.logger.Debug() so
the headReached.Store(true) call is guarded by this explicit comparison to
highestSeenDAHeight.
In `@block/internal/syncing/syncer_benchmark_test.go`:
- Around line 46-61: The test starts goroutines with b.Context() which doesn't
get cancelled by the fixture teardown (fixt.s.cancel()), causing potential
goroutine leaks; change all uses of b.Context() when launching long-lived worker
goroutines to use the fixture's cancelable context (fixt.s.ctx) instead —
specifically pass fixt.s.ctx into fixt.s.processLoop, follower.runCatchup (and
the NewDAFollower call site that spawns it), and fixt.s.startSyncWorkers; apply
the same replacement for the other occurrences mentioned (lines near 64-69 and
78-79) so teardown via fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down
workers.
In `@block/internal/syncing/syncer.go`:
- Around line 198-213: processLoop is started with s.wg.Go before starting
s.daFollower, so if s.daFollower.Start(ctx) fails the goroutine can leak; wrap
the work with a cancellable child context (e.g., childCtx, cancel :=
context.WithCancel(ctx)) and pass childCtx to s.processLoop and to
daFollower.Start, then on Start failure call cancel() and wait for in-flight
goroutines to finish (s.wg.Wait or equivalent) before returning the error;
update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.
In `@test/testda/dummy.go`:
- Around line 66-77: The goroutine unconditionally closes ch on ctx.Done(),
causing a double-close if Reset() already closed it; modify the removal logic in
the ctx.Done() goroutine (which locks d.mu and iterates d.subscribers comparing
s to sub) to only close ch when this goroutine actually removed the subscriber
(e.g., set a removed bool when s==sub, perform d.subscribers = append(...),
break, and then close ch only if removed is true), so Reset() and this goroutine
cannot both close the same channel; reference d.mu, d.subscribers, sub, ch and
Reset() when making the change.
In `@tools/local-da/local.go`:
- Around line 230-231: The subscription events lose blob payloads because
notifySubscribers reads from d.blobData[height] while SubmitWithOptions and
other submit paths only write to d.data and never populate d.blobData; update
the submit code paths (e.g., SubmitWithOptions and the other submit handlers
around the noted regions) to also populate d.blobData[height] with the
corresponding blob list when they set d.data[height] (or move notifySubscribers
to read from d.data if that is the intended single source); ensure you update
the same symbol names (d.data, d.blobData) so notifySubscribers(d.height) will
see the blobs and subscribers receive non-empty blob lists.
---
Nitpick comments:
In `@apps/evm/server/force_inclusion_test.go`:
- Around line 53-58: Replace the hand-written Subscribe implementation on mockDA
with the mockery-generated DA mock: remove or stop using the local
mockDA.Subscribe stub and instead import and instantiate the mockery-created
mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan
da.SubscriptionEvent) or desired channel for the test; update test setup where
mockDA is constructed to use the mockery mock and set the Subscribe return
behavior via the mock's EXPECT/On methods so the external DA contract is
maintained centrally.
In `@block/internal/da/interface.go`:
- Around line 20-23: Update the comment for Subscribe to explicitly describe the
post-cancellation drain sequence: state that callers should cancel the provided
ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent
(e.g., range over the channel) until it is closed by the implementation; clarify
that the channel will be closed once cancellation is observed and that draining
(receiving until close) prevents goroutine/resource leaks. Reference the
Subscribe(ctx context.Context, namespace []byte) signature and
datypes.SubscriptionEvent in the comment.
In `@block/internal/da/tracing.go`:
- Around line 148-150: tracedClient.Subscribe bypasses the wrapper's tracing;
update the method to create a DA span (matching the tracing pattern used
elsewhere in this file), start the span with the incoming ctx, add relevant
attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any
returned error on the span, and end the span before returning the channel and
error so subscription setup failures are captured in DA spans; reference the
tracedClient.Subscribe method and t.inner.Subscribe call when making this
change.
In `@block/internal/syncing/da_retriever_tracing.go`:
- Around line 67-69: The tracing wrapper tracedDARetriever.ProcessBlobs is not
creating any span or attributes, so add instrumentation around the call to
t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this
file), set attributes for blob_count (len(blobs)), da_height (daHeight) and
optionally total_blob_bytes (sum of len for each blob), call
t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as
an attribute (e.g., da_events_count) and any error/status if applicable, then
end the span; keep the wrapper behavior identical except for adding the span and
attributes to improve visibility into latency and output volume.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 24171a7e-5193-4ec4-9f32-030b0acc2d20
📒 Files selected for processing (26)
apps/evm/cmd/run.goapps/evm/server/force_inclusion_test.goapps/grpc/cmd/run.goapps/testapp/cmd/run.goblock/internal/da/client.goblock/internal/da/interface.goblock/internal/da/tracing.goblock/internal/da/tracing_test.goblock/internal/syncing/da_follower.goblock/internal/syncing/da_retriever.goblock/internal/syncing/da_retriever_mock.goblock/internal/syncing/da_retriever_tracing.goblock/internal/syncing/da_retriever_tracing_test.goblock/internal/syncing/syncer.goblock/internal/syncing/syncer_backoff_test.goblock/internal/syncing/syncer_benchmark_test.goblock/internal/syncing/syncer_test.gopkg/cmd/run_node.gopkg/da/jsonrpc/client.gopkg/da/types/types.gotest/e2e/evm_force_inclusion_e2e_test.gotest/e2e/evm_test_common.gotest/mocks/da.gotest/testda/dummy.gotools/local-da/local.gotools/local-da/rpc.go
There was a problem hiding this comment.
Are we able to use that in the async block retriever?
There was a problem hiding this comment.
as discussed in the standup this will be done in a follow-up
julienrbrt
left a comment
There was a problem hiding this comment.
I think we can improve a bit the client constructor so we only need one and can handle if websockets are for some reasons not available.
| return binary.LittleEndian.Uint64(id[:8]), commitment, nil | ||
| } | ||
|
|
||
| // SubscriptionEvent is a namespace-agnostic signal that a blob was finalized at |
There was a problem hiding this comment.
Nit, move this type to internal/da/ or internal/common. It doesn't need to be public.
There was a problem hiding this comment.
The type is used by dummyDA as well which does not have access to the internal packages.
|
|
||
| // NewClient connects to the DA RPC endpoint using the address as-is. | ||
| // Uses HTTP by default (lazy connection — only connects on first RPC call). | ||
| // Does NOT support channel-based subscriptions (e.g. Subscribe). |
There was a problem hiding this comment.
I don't get it, the client are effectively the same, just the address changes.
Additionally, websockets can be disabled I believe, so shouldn't we keep this one constructor, check the websocket connection, use it if available and otherwise fallback to http?
There was a problem hiding this comment.
The websocket connection was causing issues with tests initially. Without the second constructor, we would upgrade everything to WS automatically. Let me check the tests again how much effort is left to upgrade all.
There was a problem hiding this comment.
With the event subscription, we make websocket connections mandatory.
There was a problem hiding this comment.
as discussed in the standup this will be done in a follow-up
| local := f.localDAHeight.Load() | ||
| highest := f.highestSeenDAHeight.Load() | ||
|
|
||
| if local > highest { |
There was a problem hiding this comment.
the local value is actually the next DA height to fetch. I will rename this to localNextDAHeight.
| f.logger.Debug(). | ||
| Uint64("da_height", priorityHeight). | ||
| Msg("fetching priority DA height from P2P hint") | ||
| if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { |
There was a problem hiding this comment.
We miss this bit of logic I believe: https://github.com/evstack/ev-node/pull/3131/changes#diff-5a67b1436f0151a88ef358629f6676db468237a65817d2c9ec68883bcc1940b0L421-L426 that piggies back on minimum da inclusion, which is useful for restarts
There was a problem hiding this comment.
Good eyes. The component maintains their state in localNextDAHeight (new name) which is updated in runCatchup or handleSubscriptionEvent (when caught up).
The syncer initializes the DAFollower with the correct height, based on it's loaded state.
L348: s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
* main: fix(node): race on caught up (#3133)
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)
265-279:⚠️ Potential issue | 🔴 CriticalUse CAS-based rollback to avoid regressing
localNextDAHeight.Lines 265, 279, and 362 unconditionally
Store(...)during rollback. If the other loop advancedlocalNextDAHeightmeanwhile, this can move it backward and reprocess heights out of order.As per coding guidelines: "Be careful with concurrent access to shared state".🛠️ Proposed fix
for _, event := range events { if err := f.pipeEvent(ctx, event); err != nil { // Roll back so catchupLoop can retry this height. - f.localNextDAHeight.Store(ev.Height) + if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) { + f.logger.Debug(). + Uint64("da_height", ev.Height). + Uint64("local_next_da_height", f.localNextDAHeight.Load()). + Msg("skip rollback: localNextDAHeight already advanced") + } f.logger.Warn().Err(err).Uint64("da_height", ev.Height). Msg("failed to pipe inline event, catchup will retry") return } } @@ } else { // No complete events (split namespace, waiting for other half). - f.localNextDAHeight.Store(ev.Height) + if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) { + f.logger.Debug(). + Uint64("da_height", ev.Height). + Uint64("local_next_da_height", f.localNextDAHeight.Load()). + Msg("skip rollback: localNextDAHeight already advanced") + } } @@ if err := f.fetchAndPipeHeight(ctx, local); err != nil { // Roll back so we can retry after backoff. - f.localNextDAHeight.Store(local) + if !f.localNextDAHeight.CompareAndSwap(local+1, local) { + f.logger.Debug(). + Uint64("da_height", local). + Uint64("local_next_da_height", f.localNextDAHeight.Load()). + Msg("skip rollback: localNextDAHeight already advanced") + } if !f.waitOnCatchupError(ctx, err, local) { return } continue }Also applies to: 360-363
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 265 - 279, The rollback unconditionally calls f.localNextDAHeight.Store(...) which can regress the counter if another goroutine advanced it; change these stores (the ones near f.localNextDAHeight.Store in the rollback path and the other occurrences noted around where events are handled and at the other block ~360-363) to a CAS update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur, ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry) so you only advance the value and never move it backward; apply this pattern to each place that currently calls f.localNextDAHeight.Store(...) during rollback.
🧹 Nitpick comments (1)
tools/local-da/local.go (1)
204-243: Consider extracting shared submit-to-rpc conversion logic.
SubmitWithOptionsandSubmitnow duplicate the same namespace conversion, blob conversion, storage, and notify flow. A small shared helper would reduce drift risk between the two code paths.Also applies to: 245-284
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/local-da/local.go` around lines 204 - 243, SubmitWithOptions and Submit duplicate namespace conversion, blob->rpc conversion, ID generation, storage into d.data/d.blobData, timestamp/height bumping and d.notifySubscribers; extract that shared logic into a single helper (e.g., processAndStoreBlobs or storeBlobsAtNextHeight) that takes (ctx, ns []byte, blobs []datypes.Blob) and performs: validate/convert namespace (libshare.NewNamespaceFromBytes), allocate new height (increment d.height, set d.timestamps[d.height]=d.monotonicTime()), generate IDs using d.nextID() and d.getHash(), append kvp to d.data[d.height], build []*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then call d.notifySubscribers(d.height) and return the generated IDs (and error). Replace the duplicated sections in SubmitWithOptions and Submit to call this helper and keep logging/validation outside as appropriate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS blob client created via
blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is never closed; after verifying err == nil,
immediately schedule closing the client (e.g., defer blobClient.Close()) so the
WebSocket is cleaned up on shutdown and prevents goroutine/resource leaks—place
the defer directly after the successful assignment to blobClient and before
returning from run-related functions.
In `@block/internal/syncing/da_follower.go`:
- Around line 112-115: The catchupLoop is only triggered by catchupSignal via
signalCatchup (called from updateHighest on subscription events), so if the node
starts behind and no new events arrive catchup never runs; modify startup to
initiate catchup immediately: when launching the goroutine(s) (the code that
calls f.wg.Add and starts followLoop and catchupLoop) ensure you either signal
catchup once at startup or start catchupLoop in a mode that performs an initial
bootstrap catchup before waiting on catchupSignal (adjust catchupLoop or call
signalCatchup directly). Apply the same change to the other goroutine startup
sites that spawn catchupLoop/followLoop to guarantee an initial catchup run even
without incoming subscription events.
In `@block/internal/syncing/syncer_backoff_test.go`:
- Around line 241-245: The test is closing over the loop variable h causing all
Run callbacks to see the final value (6); fix by capturing h per-iteration into
a new local (e.g., localH := h) before calling daRetriever.On("RetrieveFromDA",
mock.Anything, h).Run(...), and use that local (localH) inside the Run callback
so fetchedHeights receives the correct values 3,4,5 when runCatchup() executes.
In `@block/internal/syncing/syncer.go`:
- Around line 210-214: On DA follower startup failure you must stop any
previously-started retrievers to avoid goroutine leaks: after
s.daFollower.Start(ctx) returns an error, call the Stop/Close method on any
components started earlier (e.g., s.raftRetriever.Stop(ctx) or the appropriate
shutdown method on other retrievers), handle/ignore its error as needed, then
proceed with s.cancel() and s.wg.Wait(); update the failure branch that
currently only invokes s.cancel() and s.wg.Wait() to also explicitly shut down
s.raftRetriever (and any other started retrievers) before returning the error.
In `@tools/local-da/local.go`:
- Around line 226-236: The code silently ignores errors from
libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 in SubmitWithOptions and
Submit, which leads to nil rpc blobs and inconsistent state; update both
functions to check and handle those errors by returning a wrapped error (use
fmt.Errorf with context) instead of continuing, ensure you do not append ids or
store kvp entries when NewNamespaceFromBytes fails, and only append to
rpcBlobs/d.data[d.height] after successful blobrpc.NewBlobV0 conversion (refer
to functions SubmitWithOptions and Submit and the NewNamespaceFromBytes and
NewBlobV0 calls).
---
Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 265-279: The rollback unconditionally calls
f.localNextDAHeight.Store(...) which can regress the counter if another
goroutine advanced it; change these stores (the ones near
f.localNextDAHeight.Store in the rollback path and the other occurrences noted
around where events are handled and at the other block ~360-363) to a CAS
update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a
CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur,
ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry)
so you only advance the value and never move it backward; apply this pattern to
each place that currently calls f.localNextDAHeight.Store(...) during rollback.
---
Nitpick comments:
In `@tools/local-da/local.go`:
- Around line 204-243: SubmitWithOptions and Submit duplicate namespace
conversion, blob->rpc conversion, ID generation, storage into d.data/d.blobData,
timestamp/height bumping and d.notifySubscribers; extract that shared logic into
a single helper (e.g., processAndStoreBlobs or storeBlobsAtNextHeight) that
takes (ctx, ns []byte, blobs []datypes.Blob) and performs: validate/convert
namespace (libshare.NewNamespaceFromBytes), allocate new height (increment
d.height, set d.timestamps[d.height]=d.monotonicTime()), generate IDs using
d.nextID() and d.getHash(), append kvp to d.data[d.height], build
[]*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then call
d.notifySubscribers(d.height) and return the generated IDs (and error). Replace
the duplicated sections in SubmitWithOptions and Submit to call this helper and
keep logging/validation outside as appropriate.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7a45cdf6-63a2-47d8-9960-b9ec8716bbc7
📒 Files selected for processing (8)
apps/evm/cmd/run.goblock/internal/syncing/da_follower.goblock/internal/syncing/da_retriever.goblock/internal/syncing/syncer.goblock/internal/syncing/syncer_backoff_test.goblock/internal/syncing/syncer_benchmark_test.goblock/internal/syncing/syncer_test.gotools/local-da/local.go
| blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create blob client: %w", err) | ||
| } |
There was a problem hiding this comment.
Close the WebSocket DA client on shutdown.
Line 63 correctly uses cmd.Context() for dial cancellation, but the created WS client is never explicitly closed. Add a deferred blobClient.Close() after successful creation to avoid lingering WS resources during graceful shutdown paths.
Suggested fix
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
+ defer blobClient.Close()
daClient := block.NewDAClient(blobClient, nodeConfig, logger)As per coding guidelines, "Be mindful of goroutine leaks".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | |
| if err != nil { | |
| return fmt.Errorf("failed to create blob client: %w", err) | |
| } | |
| blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") | |
| if err != nil { | |
| return fmt.Errorf("failed to create blob client: %w", err) | |
| } | |
| defer blobClient.Close() | |
| daClient := block.NewDAClient(blobClient, nodeConfig, logger) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/evm/cmd/run.go` around lines 63 - 66, The WS blob client created via
blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is never closed; after verifying err == nil,
immediately schedule closing the client (e.g., defer blobClient.Close()) so the
WebSocket is cleaned up on shutdown and prevents goroutine/resource leaks—place
the defer directly after the successful assignment to blobClient and before
returning from run-related functions.
| f.wg.Add(2) | ||
| go f.followLoop(ctx) | ||
| go f.catchupLoop(ctx) | ||
|
|
There was a problem hiding this comment.
Bootstrap catchup at startup instead of waiting for the first subscription event.
catchupLoop only runs after catchupSignal, and signalCatchup is only triggered by updateHighest from subscription events. If the node starts behind and no new blob event arrives, historical catchup never begins.
🛠️ Proposed fix
func (f *daFollower) Start(ctx context.Context) error {
ctx, f.cancel = context.WithCancel(ctx)
+ // Seed highest DA height on startup so catchup can begin immediately
+ // even if no fresh subscription event arrives right away.
+ if f.client != nil {
+ latest, err := f.client.GetLatestDAHeight(ctx)
+ if err != nil {
+ f.logger.Debug().Err(err).Msg("failed to seed latest DA height at startup")
+ } else {
+ f.updateHighest(latest)
+ }
+ }
+
f.wg.Add(2)
go f.followLoop(ctx)
go f.catchupLoop(ctx)Also applies to: 287-296, 309-315
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/da_follower.go` around lines 112 - 115, The
catchupLoop is only triggered by catchupSignal via signalCatchup (called from
updateHighest on subscription events), so if the node starts behind and no new
events arrive catchup never runs; modify startup to initiate catchup
immediately: when launching the goroutine(s) (the code that calls f.wg.Add and
starts followLoop and catchupLoop) ensure you either signal catchup once at
startup or start catchupLoop in a mode that performs an initial bootstrap
catchup before waiting on catchupSignal (adjust catchupLoop or call
signalCatchup directly). Apply the same change to the other goroutine startup
sites that spawn catchupLoop/followLoop to guarantee an initial catchup run even
without incoming subscription events.
| for h := uint64(3); h <= 5; h++ { | ||
| daRetriever.On("RetrieveFromDA", mock.Anything, h). | ||
| Run(func(args mock.Arguments) { | ||
| fetchedHeights = append(fetchedHeights, h) | ||
| }). |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n block/internal/syncing/syncer_backoff_test.go | sed -n '235,260p'Repository: evstack/ev-node
Length of output: 1128
Capture h per-iteration before using it in Run callback.
Line 244 closes over loop variable h; when callbacks execute during runCatchup(), h will be 6 (post-loop value), so all callbacks append the same incorrect height instead of 3, 4, 5. Causes test assertion to fail.
🛠️ Proposed fix
for h := uint64(3); h <= 5; h++ {
+ h := h
daRetriever.On("RetrieveFromDA", mock.Anything, h).
Run(func(args mock.Arguments) {
fetchedHeights = append(fetchedHeights, h)
}).
Return(nil, datypes.ErrBlobNotFound).Once()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for h := uint64(3); h <= 5; h++ { | |
| daRetriever.On("RetrieveFromDA", mock.Anything, h). | |
| Run(func(args mock.Arguments) { | |
| fetchedHeights = append(fetchedHeights, h) | |
| }). | |
| for h := uint64(3); h <= 5; h++ { | |
| h := h | |
| daRetriever.On("RetrieveFromDA", mock.Anything, h). | |
| Run(func(args mock.Arguments) { | |
| fetchedHeights = append(fetchedHeights, h) | |
| }). | |
| Return(nil, datypes.ErrBlobNotFound).Once() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer_backoff_test.go` around lines 241 - 245, The
test is closing over the loop variable h causing all Run callbacks to see the
final value (6); fix by capturing h per-iteration into a new local (e.g., localH
:= h) before calling daRetriever.On("RetrieveFromDA", mock.Anything,
h).Run(...), and use that local (localH) inside the Run callback so
fetchedHeights receives the correct values 3,4,5 when runCatchup() executes.
| if err := s.daFollower.Start(ctx); err != nil { | ||
| s.cancel() | ||
| s.wg.Wait() | ||
| return fmt.Errorf("failed to start DA follower: %w", err) | ||
| } |
There was a problem hiding this comment.
Stop already-started retrievers on follower startup failure.
If s.daFollower.Start(ctx) fails, s.cancel() + s.wg.Wait() only handles syncer-owned workers. Components started earlier (notably s.raftRetriever) are not explicitly stopped here.
🛠️ Proposed fix
if err := s.daFollower.Start(ctx); err != nil {
s.cancel()
+ if s.daFollower != nil {
+ s.daFollower.Stop()
+ }
+ if s.raftRetriever != nil {
+ s.raftRetriever.Stop()
+ }
s.wg.Wait()
return fmt.Errorf("failed to start DA follower: %w", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err := s.daFollower.Start(ctx); err != nil { | |
| s.cancel() | |
| s.wg.Wait() | |
| return fmt.Errorf("failed to start DA follower: %w", err) | |
| } | |
| if err := s.daFollower.Start(ctx); err != nil { | |
| s.cancel() | |
| if s.daFollower != nil { | |
| s.daFollower.Stop() | |
| } | |
| if s.raftRetriever != nil { | |
| s.raftRetriever.Stop() | |
| } | |
| s.wg.Wait() | |
| return fmt.Errorf("failed to start DA follower: %w", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/syncing/syncer.go` around lines 210 - 214, On DA follower
startup failure you must stop any previously-started retrievers to avoid
goroutine leaks: after s.daFollower.Start(ctx) returns an error, call the
Stop/Close method on any components started earlier (e.g.,
s.raftRetriever.Stop(ctx) or the appropriate shutdown method on other
retrievers), handle/ignore its error as needed, then proceed with s.cancel() and
s.wg.Wait(); update the failure branch that currently only invokes s.cancel()
and s.wg.Wait() to also explicitly shut down s.raftRetriever (and any other
started retrievers) before returning the error.
| nspace, _ := libshare.NewNamespaceFromBytes(ns) | ||
| rpcBlobs := make([]*blobrpc.Blob, len(blobs)) | ||
|
|
||
| for i, blob := range blobs { | ||
| ids[i] = append(d.nextID(), d.getHash(blob)...) | ||
|
|
||
| d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob}) | ||
|
|
||
| if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil { | ||
| rpcBlobs[i] = b | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Read the file to inspect lines 220-245 and 260-285
head -n 245 tools/local-da/local.go | tail -n 26 | cat -nRepository: evstack/ev-node
Length of output: 953
🏁 Script executed:
# Also check lines 260-285
head -n 285 tools/local-da/local.go | tail -n 26 | cat -nRepository: evstack/ev-node
Length of output: 885
🏁 Script executed:
# Search for all NewNamespaceFromBytes and NewBlobV0 calls in the file
rg -n '(NewNamespaceFromBytes|NewBlobV0)' tools/local-da/local.go -B 2 -A 4Repository: evstack/ev-node
Length of output: 995
🏁 Script executed:
# Check the entire file size to understand context
wc -l tools/local-da/local.goRepository: evstack/ev-node
Length of output: 86
Handle namespace and blob conversion errors instead of silently discarding failed conversions.
Both SubmitWithOptions and Submit ignore errors from libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 (lines 226, 234, 267, 275). This causes the methods to return success with valid IDs while d.blobData[d.height] contains nil entries for failed conversions. Subscribers then receive incomplete payloads, creating inconsistency between stored data and streamed data.
Proposed fix
- nspace, _ := libshare.NewNamespaceFromBytes(ns)
- rpcBlobs := make([]*blobrpc.Blob, len(blobs))
+ nspace, err := libshare.NewNamespaceFromBytes(ns)
+ if err != nil {
+ return nil, fmt.Errorf("submit: invalid namespace bytes: %w", err)
+ }
+ rpcBlobs := make([]*blobrpc.Blob, len(blobs))
for i, blob := range blobs {
- if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil {
- rpcBlobs[i] = b
- }
+ b, err := blobrpc.NewBlobV0(nspace, blob)
+ if err != nil {
+ return nil, fmt.Errorf("submit: build rpc blob at index %d: %w", i, err)
+ }
+ rpcBlobs[i] = b
}Per coding guidelines: "Validate all inputs from external sources" and "Wrap errors with context using fmt.Errorf".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/local-da/local.go` around lines 226 - 236, The code silently ignores
errors from libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 in
SubmitWithOptions and Submit, which leads to nil rpc blobs and inconsistent
state; update both functions to check and handle those errors by returning a
wrapped error (use fmt.Errorf with context) instead of continuing, ensure you do
not append ids or store kvp entries when NewNamespaceFromBytes fails, and only
append to rpcBlobs/d.data[d.height] after successful blobrpc.NewBlobV0
conversion (refer to functions SubmitWithOptions and Submit and the
NewNamespaceFromBytes and NewBlobV0 calls).
There was a problem hiding this comment.
♻️ Duplicate comments (3)
block/internal/syncing/da_follower.go (3)
109-120:⚠️ Potential issue | 🟠 MajorCatchup may never start if no subscription events arrive.
If the node starts behind and no new DA blob event arrives,
catchupLoopwill block indefinitely oncatchupSignal. The signal is only sent byupdateHighest, which is only called when subscription events arrive.Consider seeding
highestSeenDAHeightat startup by querying the latest DA height:🐛 Proposed bootstrap fix
func (f *daFollower) Start(ctx context.Context) error { ctx, f.cancel = context.WithCancel(ctx) + // Seed highest DA height on startup so catchup can begin immediately + // even if no fresh subscription event arrives right away. + if f.client != nil { + if latest, err := f.client.GetLatestDAHeight(ctx); err == nil { + f.updateHighest(latest) + } else { + f.logger.Debug().Err(err).Msg("failed to seed latest DA height at startup") + } + } + f.wg.Add(2) go f.followLoop(ctx) go f.catchupLoop(ctx)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 109 - 120, The catchupLoop may never run if no subscription events arrive because highestSeenDAHeight is only advanced via updateHighest; to fix, during daFollower.Start seed highestSeenDAHeight (or call updateHighest) by querying the latest DA height from the DA client before launching followLoop/catchupLoop so catchupSignal can be triggered even with no incoming events; locate Start (method daFollower.Start), highestSeenDAHeight, updateHighest, catchupSignal and ensure Start obtains the current DA head and updates/sets highestSeenDAHeight (or invokes updateHighest) and emits the catchupSignal if needed before starting the goroutines.
260-281:⚠️ Potential issue | 🟠 MajorUse CAS-based rollback to avoid regressing
localNextDAHeight.The unconditional
Store(ev.Height)at Lines 265 and 279 can regresslocalNextDAHeightifcatchupLoophas advanced it concurrently:
- followLoop:
CAS(N, N+1)succeeds- catchupLoop:
CAS(N+1, N+2)succeeds- followLoop:
pipeEventfails →Store(N)regresses value from N+2 to NThis could cause duplicate processing of heights N and N+1.
🐛 Proposed CAS-based rollback
for _, event := range events { if err := f.pipeEvent(ctx, event); err != nil { - // Roll back so catchupLoop can retry this height. - f.localNextDAHeight.Store(ev.Height) + // Roll back only if catchupLoop hasn't advanced past us. + f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) f.logger.Warn().Err(err).Uint64("da_height", ev.Height). Msg("failed to pipe inline event, catchup will retry") return } } if len(events) != 0 { if !f.headReached.Load() && f.localNextDAHeight.Load() > f.highestSeenDAHeight.Load() { f.headReached.Store(true) } f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). Msg("processed subscription blobs inline (fast path)") } else { - // No complete events (split namespace, waiting for other half). - f.localNextDAHeight.Store(ev.Height) + // No complete events (split namespace, waiting for other half). + // Roll back only if catchupLoop hasn't advanced past us. + f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 260 - 281, The rollback after a failed pipeEvent (and the fallback when no complete events) unconditionally calls f.localNextDAHeight.Store(ev.Height), which can regress progress if catchupLoop advanced it; change both rollbacks to a CAS that only sets ev.Height when the current value is ev.Height+1 (i.e., call f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert if this goroutine still owns the +1 advance; do the same replacement for the other Store(ev.Height) path, and optionally log or ignore the CAS failure instead of forcing the store; locate these changes around the pipeEvent error handling and the "No complete events" branch in the followLoop code that manipulates localNextDAHeight.
362-369:⚠️ Potential issue | 🟠 MajorSame CAS-based rollback issue in catchupLoop.
Line 364 has the same race condition as the inline processing path. If
followLoopadvanceslocalNextDAHeightvia inline processing between the CAS at Line 357 and the error rollback at Line 364, theStorewill regress the value.🐛 Proposed fix
if err := f.fetchAndPipeHeight(ctx, local); err != nil { - // Roll back so we can retry after backoff. - f.localNextDAHeight.Store(local) + // Roll back only if followLoop hasn't advanced past us. + f.localNextDAHeight.CompareAndSwap(local+1, local) if !f.waitOnCatchupError(ctx, err, local) { return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 362 - 369, The rollback in catchupLoop uses f.localNextDAHeight.Store(local) which can regress progress if followLoop advanced the value; change the rollback to only set localNextDAHeight back when the current value still equals the expected local (use an atomic CompareAndSwap or conditional load+store): e.g., check f.localNextDAHeight.Load() == local (or use f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing the Store, so fetchAndPipeHeight/followLoop races won't regress the counter.
🧹 Nitpick comments (2)
test/testda/dummy.go (1)
298-311: Consider moving height increment inside the lock for consistency.In
Submit(line 149),d.height.Add(1)occurs inside the mutex-protected section. Here, it occurs before acquiringd.mu, creating a window whereGetLatestDAHeight()returns the new height before the subscription event is sent. This ordering difference could cause subtle timing issues in tests.♻️ Optional fix for consistency
select { case <-ticker.C: now := time.Now() - height := d.height.Add(1) d.mu.Lock() + height := d.height.Add(1) if d.headers[height] == nil {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/testda/dummy.go` around lines 298 - 311, The height increment d.height.Add(1) should be moved inside the mutex to match Submit's ordering and avoid racing GetLatestDAHeight(): acquire d.mu before computing/assigning height (inside the ticker handler), then update d.headers, call d.notifySubscribers with the new height, and finally release the lock; ensure you still compute Timestamp now := time.Now() as needed and keep notifySubscribers invoked while the lock is held to preserve ordering consistency with Submit and avoid the window where GetLatestDAHeight() could observe the new height before the subscription event is emitted.block/internal/syncing/da_follower.go (1)
88-106: Consider validating required config fields.
NewDAFollowerdoesn't validate thatcfg.Clientandcfg.Retrieverare non-nil. If either is nil, subsequent calls inrunSubscriptionorfetchAndPipeHeightwill panic.🛡️ Proposed defensive validation
func NewDAFollower(cfg DAFollowerConfig) DAFollower { + if cfg.Client == nil { + panic("DAFollowerConfig.Client is required") + } + if cfg.Retriever == nil { + panic("DAFollowerConfig.Retriever is required") + } + if cfg.PipeEvent == nil { + panic("DAFollowerConfig.PipeEvent is required") + } dataNs := cfg.DataNamespace if len(dataNs) == 0 { dataNs = cfg.Namespace }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/syncing/da_follower.go` around lines 88 - 106, NewDAFollower should validate required config fields: check that cfg.Client and cfg.Retriever are non-nil before constructing the daFollower, returning an error or panicking early as your codebase convention dictates; update NewDAFollower (and its caller contract if needed) to validate DAFollowerConfig.Client and DAFollowerConfig.Retriever and avoid creating a daFollower that will panic later in runSubscription or fetchAndPipeHeight, ensuring any error message clearly names the missing field(s) and references the constructor NewDAFollower and the daFollower struct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 109-120: The catchupLoop may never run if no subscription events
arrive because highestSeenDAHeight is only advanced via updateHighest; to fix,
during daFollower.Start seed highestSeenDAHeight (or call updateHighest) by
querying the latest DA height from the DA client before launching
followLoop/catchupLoop so catchupSignal can be triggered even with no incoming
events; locate Start (method daFollower.Start), highestSeenDAHeight,
updateHighest, catchupSignal and ensure Start obtains the current DA head and
updates/sets highestSeenDAHeight (or invokes updateHighest) and emits the
catchupSignal if needed before starting the goroutines.
- Around line 260-281: The rollback after a failed pipeEvent (and the fallback
when no complete events) unconditionally calls
f.localNextDAHeight.Store(ev.Height), which can regress progress if catchupLoop
advanced it; change both rollbacks to a CAS that only sets ev.Height when the
current value is ev.Height+1 (i.e., call
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert
if this goroutine still owns the +1 advance; do the same replacement for the
other Store(ev.Height) path, and optionally log or ignore the CAS failure
instead of forcing the store; locate these changes around the pipeEvent error
handling and the "No complete events" branch in the followLoop code that
manipulates localNextDAHeight.
- Around line 362-369: The rollback in catchupLoop uses
f.localNextDAHeight.Store(local) which can regress progress if followLoop
advanced the value; change the rollback to only set localNextDAHeight back when
the current value still equals the expected local (use an atomic CompareAndSwap
or conditional load+store): e.g., check f.localNextDAHeight.Load() == local (or
use f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing
the Store, so fetchAndPipeHeight/followLoop races won't regress the counter.
---
Nitpick comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 88-106: NewDAFollower should validate required config fields:
check that cfg.Client and cfg.Retriever are non-nil before constructing the
daFollower, returning an error or panicking early as your codebase convention
dictates; update NewDAFollower (and its caller contract if needed) to validate
DAFollowerConfig.Client and DAFollowerConfig.Retriever and avoid creating a
daFollower that will panic later in runSubscription or fetchAndPipeHeight,
ensuring any error message clearly names the missing field(s) and references the
constructor NewDAFollower and the daFollower struct.
In `@test/testda/dummy.go`:
- Around line 298-311: The height increment d.height.Add(1) should be moved
inside the mutex to match Submit's ordering and avoid racing
GetLatestDAHeight(): acquire d.mu before computing/assigning height (inside the
ticker handler), then update d.headers, call d.notifySubscribers with the new
height, and finally release the lock; ensure you still compute Timestamp now :=
time.Now() as needed and keep notifySubscribers invoked while the lock is held
to preserve ordering consistency with Submit and avoid the window where
GetLatestDAHeight() could observe the new height before the subscription event
is emitted.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d6918e55-cf7b-4700-9ded-4f821cec7349
📒 Files selected for processing (3)
block/internal/syncing/da_follower.goblock/internal/syncing/da_retriever_strict_test.gotest/testda/dummy.go
Summary
Replaces the Syncer's polling-based DA worker with an event-driven
DAFollowerthat subscribes to DA header and data events in real time. This eliminates unnecessary polling latency in follow mode and brings zero-latency block processing when the node is caught up with the DA layer.Changes
Core: Event-Driven
DAFollower(block/internal/syncing/)DAFollower, a new component that subscribes to both the header and data namespaces of the DA layer and processes them inline, achieving zero-latency follow mode.Syncerrefactored to delegate event-driven follow logic toDAFollower; the old polling worker is removed.DARetrieverinterface and its tracing/mock implementations to support the new subscription flow.DA Client: WebSocket-Based JSON-RPC (
pkg/da/,block/internal/da/)Local DA Tooling (
tools/local-da/)local.gowith subscription broadcasting andrpc.gowith the corresponding RPC endpoints.Test Infrastructure
da_retriever_mock.goand updates totest/mocks/da.goandtest/testda/dummy.goto cover subscription interfaces.syncer_backoff_test.goandsyncer_test.goupdated and significantly extended to cover the new event-driven flows.evnodeflags and P2P address retrieval patterns.Summary by CodeRabbit
New Features
Bug Fixes
Tests