Skip to content

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131

Open
alpe wants to merge 13 commits intomainfrom
alex/2803_best_2worlds
Open

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
alpe wants to merge 13 commits intomainfrom
alex/2803_best_2worlds

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 3, 2026

Summary

Replaces the Syncer's polling-based DA worker with an event-driven DAFollower that subscribes to DA header and data events in real time. This eliminates unnecessary polling latency in follow mode and brings zero-latency block processing when the node is caught up with the DA layer.

Changes

Core: Event-Driven DAFollower (block/internal/syncing/)

  • Introduces DAFollower, a new component that subscribes to both the header and data namespaces of the DA layer and processes them inline, achieving zero-latency follow mode.
  • Adds an inline blob processing path so blobs arriving with their header are handled immediately without a separate retrieval round-trip.
  • Adds a subscription watchdog that detects stalled DA subscriptions and triggers recovery.
  • Syncer refactored to delegate event-driven follow logic to DAFollower; the old polling worker is removed.
  • Extends the DARetriever interface and its tracing/mock implementations to support the new subscription flow.

DA Client: WebSocket-Based JSON-RPC (pkg/da/, block/internal/da/)

  • Updates the JSON-RPC DA client to use WebSockets for subscriptions, replacing the previous HTTP-polling approach.
  • Adds a proper WebSocket constructor to the client so callers can opt into streaming.
  • Extends DA types with subscription-relevant fields.
  • Security hardening on the DA subscription path.

Local DA Tooling (tools/local-da/)

  • Adds blob subscription support to the local dummy DA, enabling full E2E testing of the event-driven path without a live Celestia node.
  • Extends local.go with subscription broadcasting and rpc.go with the corresponding RPC endpoints.

Test Infrastructure

  • New da_retriever_mock.go and updates to test/mocks/da.go and test/testda/dummy.go to cover subscription interfaces.
  • syncer_backoff_test.go and syncer_test.go updated and significantly extended to cover the new event-driven flows.
  • E2E tests updated for new evnode flags and P2P address retrieval patterns.

Summary by CodeRabbit

  • New Features

    • WebSocket-based blob client for real-time DA connectivity and channel subscriptions
    • Blob subscription API for streaming DA events
    • DA follower for continuous catch-up and inline blob processing
    • Local/test DA now supports publish/subscribe notifications
  • Bug Fixes

    • Corrected configuration flag names for full node startup
  • Tests

    • Expanded tests for catch-up, backoff, inline processing, and subscription flows

alpe added 3 commits March 3, 2026 13:02
…ollower and introduce DA client subscription.
…w mode

When the DA subscription delivers blobs at the current local DA height,
the followLoop now processes them inline via ProcessBlobs — avoiding
a round-trip re-fetch from the DA layer.

Architecture:
- followLoop: processes subscription blobs inline when caught up (fast path),
  falls through to catchupLoop when behind (slow path).
- catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync.

Changes:
- Add Blobs field to SubscriptionEvent for carrying raw blob data
- Add extractBlobData() to DA client Subscribe adapter
- Export ProcessBlobs on DARetriever interface
- Add handleSubscriptionEvent() to DAFollower with inline fast path
- Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now
subscribes to both and merges events via a fan-in goroutine. This ensures
inline blob processing works correctly for split-namespace configurations.

Changes:
- Add DataNamespace to DAFollowerConfig and daFollower
- Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in
- Guard handleSubscriptionEvent to only advance localDAHeight when
  ProcessBlobs returns at least one complete event (header+data matched)
- Pass DataNamespace from syncer.go
- Implement Subscribe on DummyDA test helper with subscriber notification
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 5, 2026, 12:47 PM

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

Adds WebSocket-based DA subscriptions and a DAFollower that subscribes to DA events, drives inline blob processing via ProcessBlobs, and performs prioritized sequential catch‑up; replaces blob client constructors with NewWSClient and propagates Subscribe through DA client interfaces and mocks.

Changes

Cohort / File(s) Summary
Blob client constructor updates
apps/evm/cmd/run.go, apps/grpc/cmd/run.go, apps/testapp/cmd/run.go, pkg/cmd/run_node.go
Replaced blobrpc.NewClient usages with blobrpc.NewWSClient (WS URL conversion) and adjusted context usage where applicable to enable WebSocket subscriptions.
DA client interface & JSON-RPC WS client
block/internal/da/interface.go, block/internal/da/client.go, block/internal/da/tracing.go, pkg/da/jsonrpc/client.go
Added Subscribe(ctx, namespace) to DA client interface and implemented it in internal client; added NewWSClient(ctx, addr, token, authHeaderName) constructor performing HTTP→WS conversion.
DA subscription event type
pkg/da/types/types.go
Added SubscriptionEvent struct (Height uint64, Blobs [][]byte) used by Subscribe producers/consumers.
DAFollower implementation
block/internal/syncing/da_follower.go
New DAFollower (interface, config, constructor) implementing subscription-driven followLoop and sequential catchupLoop with backoff, inline processing, signaling and lifecycle methods.
DARetriever API & internals
block/internal/syncing/da_retriever.go, block/internal/syncing/da_retriever_tracing.go, block/internal/syncing/da_retriever_mock.go, block/internal/syncing/da_retriever_tracing_test.go
Added ProcessBlobs(ctx, blobs, daHeight) []common.DAHeightEvent to DARetriever API and tracing/mocks; refactored internal processing (mutex/atomic, dedupe) and added wrapper forwarding methods.
Syncer integration & refactor
block/internal/syncing/syncer.go, block/internal/syncing/syncer_backoff_test.go, block/internal/syncing/syncer_benchmark_test.go, block/internal/syncing/syncer_test.go
Replaced internal daWorkerLoop with a DAFollower field; Start/Stop now initialize/stop DAFollower; tests and benchmarks updated to use DAFollower flows.
Mocks & test DA implementations
test/mocks/da.go, apps/evm/server/force_inclusion_test.go, block/internal/da/tracing_test.go, test/testda/dummy.go, tools/local-da/local.go, tools/local-da/rpc.go
Added Subscribe to mocks and test DA implementations; added subscriber registries and publish/notify semantics; updated mock scaffolding for ProcessBlobs and tracing tests.
E2E and test CLI flag updates
test/e2e/evm_force_inclusion_e2e_test.go, test/e2e/evm_test_common.go
Full-node connect target switched to sequencer full P2P address; several test CLI flags replaced from rollkit.* to evnode.* equivalents.
Subscription plumbing in local/test DAs
tools/local-da/rpc.go, test/testda/dummy.go
Converted Subscribe to return streaming channels, added lifecycle handling and event notifications on Submit/height ticks, and integrated namespace filtering and backpressure handling.

Sequence Diagram

sequenceDiagram
    participant Syncer
    participant DAFollower
    participant DAClient as "DA Client (WS)"
    participant Retriever
    participant Pipeline as "Event Pipeline"

    Syncer->>DAFollower: Start(ctx)
    activate DAFollower

    DAFollower->>DAClient: Subscribe(ctx, namespace)
    DAClient-->>DAFollower: subscription channel (SubscriptionEvent)

    loop on subscription event
        DAFollower->>DAFollower: updateHighest(height)
        alt inline (localNext == height)
            DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
            Retriever-->>DAFollower: []DAHeightEvent
            DAFollower->>Pipeline: pipe events
        else backlog
            DAFollower->>DAFollower: signal catchup
        end
    end

    loop catch-up sequence
        DAFollower->>Retriever: RetrieveFromDA(ctx, height)
        Retriever-->>DAFollower: blobs / Err
        DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
        Retriever-->>DAFollower: []DAHeightEvent
        DAFollower->>Pipeline: pipe events
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

T:testing

Suggested reviewers

  • tuxcanfly
  • tzdybal
  • julienrbrt
  • tac0turtle

Poem

🐇 I tuned my ears to WebSocket streams,
Blobs hopped in lines, fulfilling dreams.
I follow heights by moon and sun,
Pipe events swift — hop, fetch, and run. 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.17% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat: Event-Driven DA Follower with WebSocket Subscriptions' accurately and concisely summarizes the main change: introducing event-driven DA following with WebSocket subscriptions instead of polling.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering the overview, detailed changes across multiple components (DAFollower, DA Client, Local DA Tooling, and Test Infrastructure), and aligns with the provided template requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Mar 3, 2026

Claude finished @alpe's task in 3m 33s —— View job


Code Review: Event-Driven DA Follower with WebSocket Subscriptions

  • Read changed files and gather context
  • Review core DAFollower implementation
  • Review Syncer refactor
  • Review DA client / WebSocket changes
  • Review local-da tooling
  • Review test infrastructure
  • Post comprehensive feedback

Summary

This is a well-architected change. Replacing the polling DA worker with an event-driven DAFollower is a sound design. The CAS-based coordination between followLoop and catchupLoop is clever. Below are findings from most to least critical.


Critical

1. DAFollower never starts catch-up if no subscription events arrive on boot

block/internal/syncing/da_follower.go:109-119

catchupLoop only runs when catchupSignal fires, and signalCatchup is only called from updateHighest, which is only called from handleSubscriptionEvent. If a node restarts far behind and the DA chain is quiet (no new blobs), catchupLoop will block forever waiting for a signal that never comes.

func (f *daFollower) Start(ctx context.Context) error {
    ctx, f.cancel = context.WithCancel(ctx)
    f.wg.Add(2)
    go f.followLoop(ctx)
    go f.catchupLoop(ctx)   // blocked on catchupSignal until a subscription event arrives
    ...
}

Fix: Seed highestSeenDAHeight from GetLatestDAHeight at startup, or unconditionally call f.signalCatchup() after starting the goroutines:

// At the end of Start():
f.signalCatchup() // ensure catchup runs even if no subscription event arrives

2. Unconditional Store on rollback can move localNextDAHeight backward

block/internal/syncing/da_follower.go:265, 279, 364

After a successful CAS claim (localNextDAHeight advanced from N to N+1), rollback on error uses Store(ev.Height) / Store(local), which unconditionally moves the value backward. This is safe today because the two goroutines are coordinated via CAS, but the correctness argument is non-obvious and fragile — particularly at line 364 where runCatchup could get preempted between the CAS and the rollback store, and followLoop could advance past the value being rolled back.

// Lines 265, 279 — prefer:
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)

// Line 364 — prefer:
f.localNextDAHeight.CompareAndSwap(local+1, local)

Fix this →


Major

3. WS blob client is never closed in apps/evm/cmd/run.go

apps/evm/cmd/run.go:63-66

blobrpc.NewWSClient returns a *Client that holds open WebSocket connections. There is no defer blobClient.Close() after creation. This leaks WS resources on shutdown paths (e.g., when RunE returns after an error, or during normal process exit).

blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
    return fmt.Errorf("failed to create blob client: %w", err)
}
defer blobClient.Close()  // ← missing

The same pattern should be verified for apps/grpc/cmd/run.go and apps/testapp/cmd/run.go.

Fix this →

4. Silent namespace/blob conversion errors in tools/local-da/local.go

tools/local-da/local.go:226, 234, 267, 275

Both SubmitWithOptions and Submit silently discard errors from libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0. If NewNamespaceFromBytes fails, nspace is zero-valued; all subsequent NewBlobV0 calls will silently fail, setting rpcBlobs[i] = nil. notifySubscribers then sends events with nil blob entries to subscribers, causing downstream ProcessBlobs to receive nil blobs.

nspace, _ := libshare.NewNamespaceFromBytes(ns)  // ← error silently discarded
rpcBlobs := make([]*blobrpc.Blob, len(blobs))
for i, blob := range blobs {
    if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil {  // ← silent nil on fail
        rpcBlobs[i] = b
    }
}

This should propagate the namespace error and the blob construction error (or at least log it prominently).

Fix this →

5. raftRetriever not stopped on DAFollower startup failure

block/internal/syncing/syncer.go:210-213

When s.daFollower.Start(ctx) fails, the code calls s.cancel() then s.wg.Wait(). This properly cleans up processLoop and startSyncWorkers-launched goroutines. However, s.raftRetriever.Start(ctx) (line 186-190) is not explicitly stopped before returning the error — it relies on s.cancel() propagating via context. Depending on raftRetriever's lifecycle contract this may or may not be safe; explicit s.raftRetriever.Stop(ctx) would be clearer.

6. Dead code: errCaughtUp sentinel

block/internal/syncing/da_follower.go:401-402, 408

errCaughtUp is defined and handled in waitOnCatchupError, but nothing in the codebase ever returns it. It's dead code today. Either wire it up (have fetchAndPipeHeight return it when ErrHeightFromFuture is detected) or remove it to avoid confusion.

var errCaughtUp = errors.New("caught up with DA head")  // never returned by anyone

Moderate

7. DARetriever.ProcessBlobs thread-safety contract is load-bearing

block/internal/syncing/da_retriever.go:198-205

The comment correctly states this is "NOT thread-safe". The safety guarantee relies entirely on the CAS coordination in handleSubscriptionEvent and runCatchup. This is a fragile invariant that lives in two widely-separated files. Consider making the contract more explicit or moving it into the interface doc comment for ProcessBlobs, since any future caller that misses this contract will silently corrupt pendingHeaders/pendingData.

8. Subscribe not traced in tracedClient

block/internal/da/tracing.go:148-150

func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
    return t.inner.Subscribe(ctx, namespace)  // ← no span
}

Subscription setup failures will be invisible in traces. Consistent with the other traced methods, this should at minimum capture subscribe errors.

9. ProcessBlobs not traced in tracedDARetriever

block/internal/syncing/da_retriever_tracing.go:67-69

func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
    return t.inner.ProcessBlobs(ctx, blobs, daHeight)  // ← no span
}

This is the new inline hot path. Missing trace spans here make it impossible to distinguish inline processing latency from catchup latency.


Nitpick / Minor

10. Duplicate Submit/SubmitWithOptions logic

tools/local-da/local.go:204-284

The two submit paths share ~30 lines of identical logic: namespace conversion, blob construction, ID generation, d.data/d.blobData population, and notifySubscribers. A shared helper would prevent the two paths from diverging silently again in the future.

11. Subscribe interface comment drain semantics unclear

block/internal/da/interface.go:22

"Callers MUST drain the channel after cancellation" is ambiguous. Clarify that callers should range over the channel (or keep receiving) until it is closed by the implementation after context cancellation.

12. NewWSClient is a one-liner wrapping NewClient

pkg/da/jsonrpc/client.go:76-78

NewWSClient simply calls httpToWS(addr) and delegates to NewClient. The doc comment mentions "eager connection" vs "lazy connection" distinction, but both ultimately call the same jsonrpc.NewClient. The two constructors exist for clarity and forced opt-in to WS. This is a reasonable design, but the comment "lazy connection" for NewClient is misleading if the underlying library connects eagerly too — worth verifying.

13. Loop variable in syncer_backoff_test.go:241-244

This is Go 1.25 (confirmed from go.mod), where loop variables are scoped per iteration, so this is not actually a bug. The existing CodeRabbit comment can be dismissed.


Positive Observations

  • The CAS-based two-goroutine coordination between followLoop and catchupLoop is well thought out — the design document in the struct comment is clear and accurate.
  • Watchdog timer in runSubscription is a good defensive measure against stalled subscriptions.
  • Security hardening in extractBlobData (size filtering) is a nice addition.
  • The DAFollowerConfig struct is clean and avoids a long constructor parameter list.
  • mergeSubscriptions handles the split-namespace case correctly with proper nil-channel fanin pattern.
  • processBlobs correctly handles the split-namespace "waiting for other half" case by returning empty events and rolling back localNextDAHeight.

If no subscription events arrive within 3× the DA block time (default
30s), the watchdog triggers and returns an error. The followLoop then
reconnects the subscription with the standard backoff. This prevents
the node from silently stopping sync when the DA subscription stalls
(e.g., network partition, DA node freeze).
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 31.35889% with 197 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.14%. Comparing base (c0bc141) to head (896c6ad).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/syncing/da_follower.go 39.17% 111 Missing and 7 partials ⚠️
block/internal/da/client.go 0.00% 36 Missing ⚠️
block/internal/syncing/syncer.go 5.00% 18 Missing and 1 partial ⚠️
block/internal/syncing/da_retriever.go 59.09% 6 Missing and 3 partials ⚠️
pkg/da/jsonrpc/client.go 0.00% 6 Missing ⚠️
pkg/cmd/run_node.go 0.00% 5 Missing ⚠️
block/internal/da/tracing.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3131      +/-   ##
==========================================
- Coverage   60.87%   60.14%   -0.73%     
==========================================
  Files         113      114       +1     
  Lines       11695    11909     +214     
==========================================
+ Hits         7119     7163      +44     
- Misses       3773     3932     +159     
- Partials      803      814      +11     
Flag Coverage Δ
combined 60.14% <31.35%> (-0.73%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

alpe added 2 commits March 3, 2026 15:24
…ient to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval.
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: dd7e0cd Previous: 2c75e9e Ratio
SpamoorSmoke - Batch.SetHeight (avg) 15.020408163265307 us 8.824561403508772 us 1.70
SpamoorSmoke - Store.GetBlockData (avg) 13.59433962264151 us 7.780701754385965 us 1.75

This comment was automatically generated by workflow using github-action-benchmark.

alpe added 4 commits March 4, 2026 14:13
* main:
  chore: add stricter linting (#3132)
  feat(benchmarking): adding ERC20 benchmarking test (#3114)
  feat: ensure p2p DAHint within limits (#3128)
@alpe alpe changed the title [WIP] adding sync mode feat: Event-Driven DA Follower with WebSocket Subscriptions Mar 4, 2026
@alpe alpe marked this pull request as ready for review March 4, 2026 17:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (4)
block/internal/da/interface.go (1)

20-23: Clarify the post-cancellation drain contract in the comment.

“Callers MUST drain the channel after cancellation” can be interpreted differently; consider explicitly stating the expected sequence (cancel context, keep receiving until channel closes).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/interface.go` around lines 20 - 23, Update the comment for
Subscribe to explicitly describe the post-cancellation drain sequence: state
that callers should cancel the provided ctx, then continue receiving from the
returned <-chan datypes.SubscriptionEvent (e.g., range over the channel) until
it is closed by the implementation; clarify that the channel will be closed once
cancellation is observed and that draining (receiving until close) prevents
goroutine/resource leaks. Reference the Subscribe(ctx context.Context, namespace
[]byte) signature and datypes.SubscriptionEvent in the comment.
block/internal/syncing/da_retriever_tracing.go (1)

67-69: Consider instrumenting ProcessBlobs in the tracing wrapper.

This new hot path currently emits no span/attributes, which limits visibility into inline blob processing latency and output volume.

Suggested refactor
 func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
-	return t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	ctx, span := t.tracer.Start(ctx, "DARetriever.ProcessBlobs",
+		trace.WithAttributes(
+			attribute.Int64("da.height", int64(daHeight)),
+			attribute.Int("blob.count", len(blobs)),
+		),
+	)
+	defer span.End()
+
+	events := t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	span.SetAttributes(attribute.Int("event.count", len(events)))
+	return events
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_retriever_tracing.go` around lines 67 - 69, The
tracing wrapper tracedDARetriever.ProcessBlobs is not creating any span or
attributes, so add instrumentation around the call to t.inner.ProcessBlobs:
start a span (using the same tracer used elsewhere in this file), set attributes
for blob_count (len(blobs)), da_height (daHeight) and optionally
total_blob_bytes (sum of len for each blob), call t.inner.ProcessBlobs(ctx,
blobs, daHeight), record the resulting events count as an attribute (e.g.,
da_events_count) and any error/status if applicable, then end the span; keep the
wrapper behavior identical except for adding the span and attributes to improve
visibility into latency and output volume.
apps/evm/server/force_inclusion_test.go (1)

53-58: Prefer a mockery-generated DA mock over extending the hand-written stub.

This local Subscribe stub works, but it increases interface-drift maintenance as DA APIs evolve. Using the generated DA mock keeps contract changes centralized.

As per coding guidelines, "Mock external dependencies using mockery".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/server/force_inclusion_test.go` around lines 53 - 58, Replace the
hand-written Subscribe implementation on mockDA with the mockery-generated DA
mock: remove or stop using the local mockDA.Subscribe stub and instead import
and instantiate the mockery-created mock (e.g., MockDA) and use its EXPECT/On
setup to return a closed (<-chan da.SubscriptionEvent) or desired channel for
the test; update test setup where mockDA is constructed to use the mockery mock
and set the Subscribe return behavior via the mock's EXPECT/On methods so the
external DA contract is maintained centrally.
block/internal/da/tracing.go (1)

148-150: Consider tracing subscription setup for consistency.

Subscribe currently bypasses the tracing pattern used by the rest of this wrapper, so subscribe failures won’t be visible in DA spans.

Suggested refactor
 func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
-	return t.inner.Subscribe(ctx, namespace)
+	ctx, span := t.tracer.Start(ctx, "DA.Subscribe",
+		trace.WithAttributes(
+			attribute.Int("ns.length", len(namespace)),
+			attribute.String("da.namespace", hex.EncodeToString(namespace)),
+		),
+	)
+	defer span.End()
+
+	ch, err := t.inner.Subscribe(ctx, namespace)
+	if err != nil {
+		span.RecordError(err)
+		span.SetStatus(codes.Error, err.Error())
+		return nil, err
+	}
+	return ch, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/tracing.go` around lines 148 - 150, tracedClient.Subscribe
bypasses the wrapper's tracing; update the method to create a DA span (matching
the tracing pattern used elsewhere in this file), start the span with the
incoming ctx, add relevant attributes (e.g., namespace), call
t.inner.Subscribe(ctx, namespace), record any returned error on the span, and
end the span before returning the channel and error so subscription setup
failures are captured in DA spans; reference the tracedClient.Subscribe method
and t.inner.Subscribe call when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS dial is using context.Background() when calling
blobrpc.NewWSClient which prevents CLI cancellation from propagating; replace
the background context with the command-scoped context (use cmd.Context() or a
derived context) when calling blobrpc.NewWSClient so that the DA WebSocket
handshake is canceled on CLI shutdown and respects timeouts; update the call
site where blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.

In `@block/internal/syncing/da_follower.go`:
- Around line 265-267: The unconditional f.localDAHeight.Store(ev.Height) can
regress localDAHeight if runCatchup advanced it concurrently; change the
rollback to a CAS so we only set it back when the current value equals the value
we expect to overwrite. Replace the Store(ev.Height) rollback with a
compare-and-swap using the atomic type/method (e.g.,
f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
- Around line 271-273: The code sets f.headReached.Store(true) whenever inline
processing yields events, which can be incorrect if highestSeenDAHeight is still
ahead; modify the condition around the len(events) branch in da_follower.go (the
block using ev.Height and f.headReached) to only set f.headReached when the
event height indicates we are actually caught up (e.g., ev.Height >=
f.highestSeenDAHeight or when highestSeenDAHeight is unset/zero), otherwise do
not flip headReached; update the conditional that logs with f.logger.Debug() so
the headReached.Store(true) call is guarded by this explicit comparison to
highestSeenDAHeight.

In `@block/internal/syncing/syncer_benchmark_test.go`:
- Around line 46-61: The test starts goroutines with b.Context() which doesn't
get cancelled by the fixture teardown (fixt.s.cancel()), causing potential
goroutine leaks; change all uses of b.Context() when launching long-lived worker
goroutines to use the fixture's cancelable context (fixt.s.ctx) instead —
specifically pass fixt.s.ctx into fixt.s.processLoop, follower.runCatchup (and
the NewDAFollower call site that spawns it), and fixt.s.startSyncWorkers; apply
the same replacement for the other occurrences mentioned (lines near 64-69 and
78-79) so teardown via fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down
workers.

In `@block/internal/syncing/syncer.go`:
- Around line 198-213: processLoop is started with s.wg.Go before starting
s.daFollower, so if s.daFollower.Start(ctx) fails the goroutine can leak; wrap
the work with a cancellable child context (e.g., childCtx, cancel :=
context.WithCancel(ctx)) and pass childCtx to s.processLoop and to
daFollower.Start, then on Start failure call cancel() and wait for in-flight
goroutines to finish (s.wg.Wait or equivalent) before returning the error;
update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.

In `@test/testda/dummy.go`:
- Around line 66-77: The goroutine unconditionally closes ch on ctx.Done(),
causing a double-close if Reset() already closed it; modify the removal logic in
the ctx.Done() goroutine (which locks d.mu and iterates d.subscribers comparing
s to sub) to only close ch when this goroutine actually removed the subscriber
(e.g., set a removed bool when s==sub, perform d.subscribers = append(...),
break, and then close ch only if removed is true), so Reset() and this goroutine
cannot both close the same channel; reference d.mu, d.subscribers, sub, ch and
Reset() when making the change.

In `@tools/local-da/local.go`:
- Around line 230-231: The subscription events lose blob payloads because
notifySubscribers reads from d.blobData[height] while SubmitWithOptions and
other submit paths only write to d.data and never populate d.blobData; update
the submit code paths (e.g., SubmitWithOptions and the other submit handlers
around the noted regions) to also populate d.blobData[height] with the
corresponding blob list when they set d.data[height] (or move notifySubscribers
to read from d.data if that is the intended single source); ensure you update
the same symbol names (d.data, d.blobData) so notifySubscribers(d.height) will
see the blobs and subscribers receive non-empty blob lists.

---

Nitpick comments:
In `@apps/evm/server/force_inclusion_test.go`:
- Around line 53-58: Replace the hand-written Subscribe implementation on mockDA
with the mockery-generated DA mock: remove or stop using the local
mockDA.Subscribe stub and instead import and instantiate the mockery-created
mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan
da.SubscriptionEvent) or desired channel for the test; update test setup where
mockDA is constructed to use the mockery mock and set the Subscribe return
behavior via the mock's EXPECT/On methods so the external DA contract is
maintained centrally.

In `@block/internal/da/interface.go`:
- Around line 20-23: Update the comment for Subscribe to explicitly describe the
post-cancellation drain sequence: state that callers should cancel the provided
ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent
(e.g., range over the channel) until it is closed by the implementation; clarify
that the channel will be closed once cancellation is observed and that draining
(receiving until close) prevents goroutine/resource leaks. Reference the
Subscribe(ctx context.Context, namespace []byte) signature and
datypes.SubscriptionEvent in the comment.

In `@block/internal/da/tracing.go`:
- Around line 148-150: tracedClient.Subscribe bypasses the wrapper's tracing;
update the method to create a DA span (matching the tracing pattern used
elsewhere in this file), start the span with the incoming ctx, add relevant
attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any
returned error on the span, and end the span before returning the channel and
error so subscription setup failures are captured in DA spans; reference the
tracedClient.Subscribe method and t.inner.Subscribe call when making this
change.

In `@block/internal/syncing/da_retriever_tracing.go`:
- Around line 67-69: The tracing wrapper tracedDARetriever.ProcessBlobs is not
creating any span or attributes, so add instrumentation around the call to
t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this
file), set attributes for blob_count (len(blobs)), da_height (daHeight) and
optionally total_blob_bytes (sum of len for each blob), call
t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as
an attribute (e.g., da_events_count) and any error/status if applicable, then
end the span; keep the wrapper behavior identical except for adding the span and
attributes to improve visibility into latency and output volume.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 24171a7e-5193-4ec4-9f32-030b0acc2d20

📥 Commits

Reviewing files that changed from the base of the PR and between 2c75e9e and dd7e0cd.

📒 Files selected for processing (26)
  • apps/evm/cmd/run.go
  • apps/evm/server/force_inclusion_test.go
  • apps/grpc/cmd/run.go
  • apps/testapp/cmd/run.go
  • block/internal/da/client.go
  • block/internal/da/interface.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/da_retriever_mock.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_test.go
  • pkg/cmd/run_node.go
  • pkg/da/jsonrpc/client.go
  • pkg/da/types/types.go
  • test/e2e/evm_force_inclusion_e2e_test.go
  • test/e2e/evm_test_common.go
  • test/mocks/da.go
  • test/testda/dummy.go
  • tools/local-da/local.go
  • tools/local-da/rpc.go

@julienrbrt julienrbrt self-requested a review March 4, 2026 17:50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to use that in the async block retriever?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the standup this will be done in a follow-up

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can improve a bit the client constructor so we only need one and can handle if websockets are for some reasons not available.

return binary.LittleEndian.Uint64(id[:8]), commitment, nil
}

// SubscriptionEvent is a namespace-agnostic signal that a blob was finalized at
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, move this type to internal/da/ or internal/common. It doesn't need to be public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is used by dummyDA as well which does not have access to the internal packages.


// NewClient connects to the DA RPC endpoint using the address as-is.
// Uses HTTP by default (lazy connection — only connects on first RPC call).
// Does NOT support channel-based subscriptions (e.g. Subscribe).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, the client are effectively the same, just the address changes.
Additionally, websockets can be disabled I believe, so shouldn't we keep this one constructor, check the websocket connection, use it if available and otherwise fallback to http?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The websocket connection was causing issues with tests initially. Without the second constructor, we would upgrade everything to WS automatically. Let me check the tests again how much effort is left to upgrade all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the event subscription, we make websocket connections mandatory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the standup this will be done in a follow-up

local := f.localDAHeight.Load()
highest := f.highestSeenDAHeight.Load()

if local > highest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the local value is actually the next DA height to fetch. I will rename this to localNextDAHeight.

f.logger.Debug().
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We miss this bit of logic I believe: https://github.com/evstack/ev-node/pull/3131/changes#diff-5a67b1436f0151a88ef358629f6676db468237a65817d2c9ec68883bcc1940b0L421-L426 that piggies back on minimum da inclusion, which is useful for restarts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eyes. The component maintains their state in localNextDAHeight (new name) which is updated in runCatchup or handleSubscriptionEvent (when caught up).
The syncer initializes the DAFollower with the correct height, based on it's loaded state.

L348: s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)

265-279: ⚠️ Potential issue | 🔴 Critical

Use CAS-based rollback to avoid regressing localNextDAHeight.

Lines 265, 279, and 362 unconditionally Store(...) during rollback. If the other loop advanced localNextDAHeight meanwhile, this can move it backward and reprocess heights out of order.

🛠️ Proposed fix
 		for _, event := range events {
 			if err := f.pipeEvent(ctx, event); err != nil {
 				// Roll back so catchupLoop can retry this height.
-				f.localNextDAHeight.Store(ev.Height)
+				if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+					f.logger.Debug().
+						Uint64("da_height", ev.Height).
+						Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+						Msg("skip rollback: localNextDAHeight already advanced")
+				}
 				f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
 					Msg("failed to pipe inline event, catchup will retry")
 				return
 			}
 		}
@@
 		} else {
 			// No complete events (split namespace, waiting for other half).
-			f.localNextDAHeight.Store(ev.Height)
+			if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+				f.logger.Debug().
+					Uint64("da_height", ev.Height).
+					Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+					Msg("skip rollback: localNextDAHeight already advanced")
+			}
 		}
@@
 		if err := f.fetchAndPipeHeight(ctx, local); err != nil {
 			// Roll back so we can retry after backoff.
-			f.localNextDAHeight.Store(local)
+			if !f.localNextDAHeight.CompareAndSwap(local+1, local) {
+				f.logger.Debug().
+					Uint64("da_height", local).
+					Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+					Msg("skip rollback: localNextDAHeight already advanced")
+			}
 			if !f.waitOnCatchupError(ctx, err, local) {
 				return
 			}
 			continue
 		}
As per coding guidelines: "Be careful with concurrent access to shared state".

Also applies to: 360-363

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 265 - 279, The rollback
unconditionally calls f.localNextDAHeight.Store(...) which can regress the
counter if another goroutine advanced it; change these stores (the ones near
f.localNextDAHeight.Store in the rollback path and the other occurrences noted
around where events are handled and at the other block ~360-363) to a CAS
update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a
CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur,
ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry)
so you only advance the value and never move it backward; apply this pattern to
each place that currently calls f.localNextDAHeight.Store(...) during rollback.
🧹 Nitpick comments (1)
tools/local-da/local.go (1)

204-243: Consider extracting shared submit-to-rpc conversion logic.

SubmitWithOptions and Submit now duplicate the same namespace conversion, blob conversion, storage, and notify flow. A small shared helper would reduce drift risk between the two code paths.

Also applies to: 245-284

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tools/local-da/local.go` around lines 204 - 243, SubmitWithOptions and Submit
duplicate namespace conversion, blob->rpc conversion, ID generation, storage
into d.data/d.blobData, timestamp/height bumping and d.notifySubscribers;
extract that shared logic into a single helper (e.g., processAndStoreBlobs or
storeBlobsAtNextHeight) that takes (ctx, ns []byte, blobs []datypes.Blob) and
performs: validate/convert namespace (libshare.NewNamespaceFromBytes), allocate
new height (increment d.height, set d.timestamps[d.height]=d.monotonicTime()),
generate IDs using d.nextID() and d.getHash(), append kvp to d.data[d.height],
build []*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then
call d.notifySubscribers(d.height) and return the generated IDs (and error).
Replace the duplicated sections in SubmitWithOptions and Submit to call this
helper and keep logging/validation outside as appropriate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS blob client created via
blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is never closed; after verifying err == nil,
immediately schedule closing the client (e.g., defer blobClient.Close()) so the
WebSocket is cleaned up on shutdown and prevents goroutine/resource leaks—place
the defer directly after the successful assignment to blobClient and before
returning from run-related functions.

In `@block/internal/syncing/da_follower.go`:
- Around line 112-115: The catchupLoop is only triggered by catchupSignal via
signalCatchup (called from updateHighest on subscription events), so if the node
starts behind and no new events arrive catchup never runs; modify startup to
initiate catchup immediately: when launching the goroutine(s) (the code that
calls f.wg.Add and starts followLoop and catchupLoop) ensure you either signal
catchup once at startup or start catchupLoop in a mode that performs an initial
bootstrap catchup before waiting on catchupSignal (adjust catchupLoop or call
signalCatchup directly). Apply the same change to the other goroutine startup
sites that spawn catchupLoop/followLoop to guarantee an initial catchup run even
without incoming subscription events.

In `@block/internal/syncing/syncer_backoff_test.go`:
- Around line 241-245: The test is closing over the loop variable h causing all
Run callbacks to see the final value (6); fix by capturing h per-iteration into
a new local (e.g., localH := h) before calling daRetriever.On("RetrieveFromDA",
mock.Anything, h).Run(...), and use that local (localH) inside the Run callback
so fetchedHeights receives the correct values 3,4,5 when runCatchup() executes.

In `@block/internal/syncing/syncer.go`:
- Around line 210-214: On DA follower startup failure you must stop any
previously-started retrievers to avoid goroutine leaks: after
s.daFollower.Start(ctx) returns an error, call the Stop/Close method on any
components started earlier (e.g., s.raftRetriever.Stop(ctx) or the appropriate
shutdown method on other retrievers), handle/ignore its error as needed, then
proceed with s.cancel() and s.wg.Wait(); update the failure branch that
currently only invokes s.cancel() and s.wg.Wait() to also explicitly shut down
s.raftRetriever (and any other started retrievers) before returning the error.

In `@tools/local-da/local.go`:
- Around line 226-236: The code silently ignores errors from
libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 in SubmitWithOptions and
Submit, which leads to nil rpc blobs and inconsistent state; update both
functions to check and handle those errors by returning a wrapped error (use
fmt.Errorf with context) instead of continuing, ensure you do not append ids or
store kvp entries when NewNamespaceFromBytes fails, and only append to
rpcBlobs/d.data[d.height] after successful blobrpc.NewBlobV0 conversion (refer
to functions SubmitWithOptions and Submit and the NewNamespaceFromBytes and
NewBlobV0 calls).

---

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 265-279: The rollback unconditionally calls
f.localNextDAHeight.Store(...) which can regress the counter if another
goroutine advanced it; change these stores (the ones near
f.localNextDAHeight.Store in the rollback path and the other occurrences noted
around where events are handled and at the other block ~360-363) to a CAS
update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a
CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur,
ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry)
so you only advance the value and never move it backward; apply this pattern to
each place that currently calls f.localNextDAHeight.Store(...) during rollback.

---

Nitpick comments:
In `@tools/local-da/local.go`:
- Around line 204-243: SubmitWithOptions and Submit duplicate namespace
conversion, blob->rpc conversion, ID generation, storage into d.data/d.blobData,
timestamp/height bumping and d.notifySubscribers; extract that shared logic into
a single helper (e.g., processAndStoreBlobs or storeBlobsAtNextHeight) that
takes (ctx, ns []byte, blobs []datypes.Blob) and performs: validate/convert
namespace (libshare.NewNamespaceFromBytes), allocate new height (increment
d.height, set d.timestamps[d.height]=d.monotonicTime()), generate IDs using
d.nextID() and d.getHash(), append kvp to d.data[d.height], build
[]*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then call
d.notifySubscribers(d.height) and return the generated IDs (and error). Replace
the duplicated sections in SubmitWithOptions and Submit to call this helper and
keep logging/validation outside as appropriate.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7a45cdf6-63a2-47d8-9960-b9ec8716bbc7

📥 Commits

Reviewing files that changed from the base of the PR and between dd7e0cd and 6c1e630.

📒 Files selected for processing (8)
  • apps/evm/cmd/run.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_test.go
  • tools/local-da/local.go

Comment on lines +63 to 66
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the WebSocket DA client on shutdown.

Line 63 correctly uses cmd.Context() for dial cancellation, but the created WS client is never explicitly closed. Add a deferred blobClient.Close() after successful creation to avoid lingering WS resources during graceful shutdown paths.

Suggested fix
 		blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
 		if err != nil {
 			return fmt.Errorf("failed to create blob client: %w", err)
 		}
+		defer blobClient.Close()
 
 		daClient := block.NewDAClient(blobClient, nodeConfig, logger)

As per coding guidelines, "Be mindful of goroutine leaks".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
defer blobClient.Close()
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/cmd/run.go` around lines 63 - 66, The WS blob client created via
blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is never closed; after verifying err == nil,
immediately schedule closing the client (e.g., defer blobClient.Close()) so the
WebSocket is cleaned up on shutdown and prevents goroutine/resource leaks—place
the defer directly after the successful assignment to blobClient and before
returning from run-related functions.

Comment on lines +112 to +115
f.wg.Add(2)
go f.followLoop(ctx)
go f.catchupLoop(ctx)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bootstrap catchup at startup instead of waiting for the first subscription event.

catchupLoop only runs after catchupSignal, and signalCatchup is only triggered by updateHighest from subscription events. If the node starts behind and no new blob event arrives, historical catchup never begins.

🛠️ Proposed fix
 func (f *daFollower) Start(ctx context.Context) error {
 	ctx, f.cancel = context.WithCancel(ctx)
 
+	// Seed highest DA height on startup so catchup can begin immediately
+	// even if no fresh subscription event arrives right away.
+	if f.client != nil {
+		latest, err := f.client.GetLatestDAHeight(ctx)
+		if err != nil {
+			f.logger.Debug().Err(err).Msg("failed to seed latest DA height at startup")
+		} else {
+			f.updateHighest(latest)
+		}
+	}
+
 	f.wg.Add(2)
 	go f.followLoop(ctx)
 	go f.catchupLoop(ctx)

Also applies to: 287-296, 309-315

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 112 - 115, The
catchupLoop is only triggered by catchupSignal via signalCatchup (called from
updateHighest on subscription events), so if the node starts behind and no new
events arrive catchup never runs; modify startup to initiate catchup
immediately: when launching the goroutine(s) (the code that calls f.wg.Add and
starts followLoop and catchupLoop) ensure you either signal catchup once at
startup or start catchupLoop in a mode that performs an initial bootstrap
catchup before waiting on catchupSignal (adjust catchupLoop or call
signalCatchup directly). Apply the same change to the other goroutine startup
sites that spawn catchupLoop/followLoop to guarantee an initial catchup run even
without incoming subscription events.

Comment on lines +241 to +245
for h := uint64(3); h <= 5; h++ {
daRetriever.On("RetrieveFromDA", mock.Anything, h).
Run(func(args mock.Arguments) {
fetchedHeights = append(fetchedHeights, h)
}).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n block/internal/syncing/syncer_backoff_test.go | sed -n '235,260p'

Repository: evstack/ev-node

Length of output: 1128


Capture h per-iteration before using it in Run callback.

Line 244 closes over loop variable h; when callbacks execute during runCatchup(), h will be 6 (post-loop value), so all callbacks append the same incorrect height instead of 3, 4, 5. Causes test assertion to fail.

🛠️ Proposed fix
 		for h := uint64(3); h <= 5; h++ {
+			h := h
 			daRetriever.On("RetrieveFromDA", mock.Anything, h).
 				Run(func(args mock.Arguments) {
 					fetchedHeights = append(fetchedHeights, h)
 				}).
 				Return(nil, datypes.ErrBlobNotFound).Once()
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for h := uint64(3); h <= 5; h++ {
daRetriever.On("RetrieveFromDA", mock.Anything, h).
Run(func(args mock.Arguments) {
fetchedHeights = append(fetchedHeights, h)
}).
for h := uint64(3); h <= 5; h++ {
h := h
daRetriever.On("RetrieveFromDA", mock.Anything, h).
Run(func(args mock.Arguments) {
fetchedHeights = append(fetchedHeights, h)
}).
Return(nil, datypes.ErrBlobNotFound).Once()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer_backoff_test.go` around lines 241 - 245, The
test is closing over the loop variable h causing all Run callbacks to see the
final value (6); fix by capturing h per-iteration into a new local (e.g., localH
:= h) before calling daRetriever.On("RetrieveFromDA", mock.Anything,
h).Run(...), and use that local (localH) inside the Run callback so
fetchedHeights receives the correct values 3,4,5 when runCatchup() executes.

Comment on lines +210 to +214
if err := s.daFollower.Start(ctx); err != nil {
s.cancel()
s.wg.Wait()
return fmt.Errorf("failed to start DA follower: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop already-started retrievers on follower startup failure.

If s.daFollower.Start(ctx) fails, s.cancel() + s.wg.Wait() only handles syncer-owned workers. Components started earlier (notably s.raftRetriever) are not explicitly stopped here.

🛠️ Proposed fix
 	if err := s.daFollower.Start(ctx); err != nil {
 		s.cancel()
+		if s.daFollower != nil {
+			s.daFollower.Stop()
+		}
+		if s.raftRetriever != nil {
+			s.raftRetriever.Stop()
+		}
 		s.wg.Wait()
 		return fmt.Errorf("failed to start DA follower: %w", err)
 	}
As per coding guidelines: "Be mindful of goroutine leaks".
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err := s.daFollower.Start(ctx); err != nil {
s.cancel()
s.wg.Wait()
return fmt.Errorf("failed to start DA follower: %w", err)
}
if err := s.daFollower.Start(ctx); err != nil {
s.cancel()
if s.daFollower != nil {
s.daFollower.Stop()
}
if s.raftRetriever != nil {
s.raftRetriever.Stop()
}
s.wg.Wait()
return fmt.Errorf("failed to start DA follower: %w", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer.go` around lines 210 - 214, On DA follower
startup failure you must stop any previously-started retrievers to avoid
goroutine leaks: after s.daFollower.Start(ctx) returns an error, call the
Stop/Close method on any components started earlier (e.g.,
s.raftRetriever.Stop(ctx) or the appropriate shutdown method on other
retrievers), handle/ignore its error as needed, then proceed with s.cancel() and
s.wg.Wait(); update the failure branch that currently only invokes s.cancel()
and s.wg.Wait() to also explicitly shut down s.raftRetriever (and any other
started retrievers) before returning the error.

Comment on lines +226 to +236
nspace, _ := libshare.NewNamespaceFromBytes(ns)
rpcBlobs := make([]*blobrpc.Blob, len(blobs))

for i, blob := range blobs {
ids[i] = append(d.nextID(), d.getHash(blob)...)

d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob})

if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil {
rpcBlobs[i] = b
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Read the file to inspect lines 220-245 and 260-285
head -n 245 tools/local-da/local.go | tail -n 26 | cat -n

Repository: evstack/ev-node

Length of output: 953


🏁 Script executed:

# Also check lines 260-285
head -n 285 tools/local-da/local.go | tail -n 26 | cat -n

Repository: evstack/ev-node

Length of output: 885


🏁 Script executed:

# Search for all NewNamespaceFromBytes and NewBlobV0 calls in the file
rg -n '(NewNamespaceFromBytes|NewBlobV0)' tools/local-da/local.go -B 2 -A 4

Repository: evstack/ev-node

Length of output: 995


🏁 Script executed:

# Check the entire file size to understand context
wc -l tools/local-da/local.go

Repository: evstack/ev-node

Length of output: 86


Handle namespace and blob conversion errors instead of silently discarding failed conversions.

Both SubmitWithOptions and Submit ignore errors from libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 (lines 226, 234, 267, 275). This causes the methods to return success with valid IDs while d.blobData[d.height] contains nil entries for failed conversions. Subscribers then receive incomplete payloads, creating inconsistency between stored data and streamed data.

Proposed fix
- nspace, _ := libshare.NewNamespaceFromBytes(ns)
- rpcBlobs := make([]*blobrpc.Blob, len(blobs))
+ nspace, err := libshare.NewNamespaceFromBytes(ns)
+ if err != nil {
+   return nil, fmt.Errorf("submit: invalid namespace bytes: %w", err)
+ }
+ rpcBlobs := make([]*blobrpc.Blob, len(blobs))

  for i, blob := range blobs {
-   if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil {
-     rpcBlobs[i] = b
-   }
+   b, err := blobrpc.NewBlobV0(nspace, blob)
+   if err != nil {
+     return nil, fmt.Errorf("submit: build rpc blob at index %d: %w", i, err)
+   }
+   rpcBlobs[i] = b
  }

Per coding guidelines: "Validate all inputs from external sources" and "Wrap errors with context using fmt.Errorf".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tools/local-da/local.go` around lines 226 - 236, The code silently ignores
errors from libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 in
SubmitWithOptions and Submit, which leads to nil rpc blobs and inconsistent
state; update both functions to check and handle those errors by returning a
wrapped error (use fmt.Errorf with context) instead of continuing, ensure you do
not append ids or store kvp entries when NewNamespaceFromBytes fails, and only
append to rpcBlobs/d.data[d.height] after successful blobrpc.NewBlobV0
conversion (refer to functions SubmitWithOptions and Submit and the
NewNamespaceFromBytes and NewBlobV0 calls).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
block/internal/syncing/da_follower.go (3)

109-120: ⚠️ Potential issue | 🟠 Major

Catchup may never start if no subscription events arrive.

If the node starts behind and no new DA blob event arrives, catchupLoop will block indefinitely on catchupSignal. The signal is only sent by updateHighest, which is only called when subscription events arrive.

Consider seeding highestSeenDAHeight at startup by querying the latest DA height:

🐛 Proposed bootstrap fix
 func (f *daFollower) Start(ctx context.Context) error {
 	ctx, f.cancel = context.WithCancel(ctx)
 
+	// Seed highest DA height on startup so catchup can begin immediately
+	// even if no fresh subscription event arrives right away.
+	if f.client != nil {
+		if latest, err := f.client.GetLatestDAHeight(ctx); err == nil {
+			f.updateHighest(latest)
+		} else {
+			f.logger.Debug().Err(err).Msg("failed to seed latest DA height at startup")
+		}
+	}
+
 	f.wg.Add(2)
 	go f.followLoop(ctx)
 	go f.catchupLoop(ctx)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 109 - 120, The
catchupLoop may never run if no subscription events arrive because
highestSeenDAHeight is only advanced via updateHighest; to fix, during
daFollower.Start seed highestSeenDAHeight (or call updateHighest) by querying
the latest DA height from the DA client before launching followLoop/catchupLoop
so catchupSignal can be triggered even with no incoming events; locate Start
(method daFollower.Start), highestSeenDAHeight, updateHighest, catchupSignal and
ensure Start obtains the current DA head and updates/sets highestSeenDAHeight
(or invokes updateHighest) and emits the catchupSignal if needed before starting
the goroutines.

260-281: ⚠️ Potential issue | 🟠 Major

Use CAS-based rollback to avoid regressing localNextDAHeight.

The unconditional Store(ev.Height) at Lines 265 and 279 can regress localNextDAHeight if catchupLoop has advanced it concurrently:

  1. followLoop: CAS(N, N+1) succeeds
  2. catchupLoop: CAS(N+1, N+2) succeeds
  3. followLoop: pipeEvent fails → Store(N) regresses value from N+2 to N

This could cause duplicate processing of heights N and N+1.

🐛 Proposed CAS-based rollback
 		for _, event := range events {
 			if err := f.pipeEvent(ctx, event); err != nil {
-				// Roll back so catchupLoop can retry this height.
-				f.localNextDAHeight.Store(ev.Height)
+				// Roll back only if catchupLoop hasn't advanced past us.
+				f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 				f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
 					Msg("failed to pipe inline event, catchup will retry")
 				return
 			}
 		}
 		if len(events) != 0 {
 			if !f.headReached.Load() && f.localNextDAHeight.Load() > f.highestSeenDAHeight.Load() {
 				f.headReached.Store(true)
 			}
 			f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
 				Msg("processed subscription blobs inline (fast path)")
 		} else {
-			// No complete events (split namespace, waiting for other half).
-			f.localNextDAHeight.Store(ev.Height)
+			// No complete events (split namespace, waiting for other half).
+			// Roll back only if catchupLoop hasn't advanced past us.
+			f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 260 - 281, The rollback
after a failed pipeEvent (and the fallback when no complete events)
unconditionally calls f.localNextDAHeight.Store(ev.Height), which can regress
progress if catchupLoop advanced it; change both rollbacks to a CAS that only
sets ev.Height when the current value is ev.Height+1 (i.e., call
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert
if this goroutine still owns the +1 advance; do the same replacement for the
other Store(ev.Height) path, and optionally log or ignore the CAS failure
instead of forcing the store; locate these changes around the pipeEvent error
handling and the "No complete events" branch in the followLoop code that
manipulates localNextDAHeight.

362-369: ⚠️ Potential issue | 🟠 Major

Same CAS-based rollback issue in catchupLoop.

Line 364 has the same race condition as the inline processing path. If followLoop advances localNextDAHeight via inline processing between the CAS at Line 357 and the error rollback at Line 364, the Store will regress the value.

🐛 Proposed fix
 		if err := f.fetchAndPipeHeight(ctx, local); err != nil {
-			// Roll back so we can retry after backoff.
-			f.localNextDAHeight.Store(local)
+			// Roll back only if followLoop hasn't advanced past us.
+			f.localNextDAHeight.CompareAndSwap(local+1, local)
 			if !f.waitOnCatchupError(ctx, err, local) {
 				return
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 362 - 369, The rollback
in catchupLoop uses f.localNextDAHeight.Store(local) which can regress progress
if followLoop advanced the value; change the rollback to only set
localNextDAHeight back when the current value still equals the expected local
(use an atomic CompareAndSwap or conditional load+store): e.g., check
f.localNextDAHeight.Load() == local (or use
f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing the
Store, so fetchAndPipeHeight/followLoop races won't regress the counter.
🧹 Nitpick comments (2)
test/testda/dummy.go (1)

298-311: Consider moving height increment inside the lock for consistency.

In Submit (line 149), d.height.Add(1) occurs inside the mutex-protected section. Here, it occurs before acquiring d.mu, creating a window where GetLatestDAHeight() returns the new height before the subscription event is sent. This ordering difference could cause subtle timing issues in tests.

♻️ Optional fix for consistency
 		select {
 		case <-ticker.C:
 			now := time.Now()
-			height := d.height.Add(1)
 			d.mu.Lock()
+			height := d.height.Add(1)
 			if d.headers[height] == nil {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/testda/dummy.go` around lines 298 - 311, The height increment
d.height.Add(1) should be moved inside the mutex to match Submit's ordering and
avoid racing GetLatestDAHeight(): acquire d.mu before computing/assigning height
(inside the ticker handler), then update d.headers, call d.notifySubscribers
with the new height, and finally release the lock; ensure you still compute
Timestamp now := time.Now() as needed and keep notifySubscribers invoked while
the lock is held to preserve ordering consistency with Submit and avoid the
window where GetLatestDAHeight() could observe the new height before the
subscription event is emitted.
block/internal/syncing/da_follower.go (1)

88-106: Consider validating required config fields.

NewDAFollower doesn't validate that cfg.Client and cfg.Retriever are non-nil. If either is nil, subsequent calls in runSubscription or fetchAndPipeHeight will panic.

🛡️ Proposed defensive validation
 func NewDAFollower(cfg DAFollowerConfig) DAFollower {
+	if cfg.Client == nil {
+		panic("DAFollowerConfig.Client is required")
+	}
+	if cfg.Retriever == nil {
+		panic("DAFollowerConfig.Retriever is required")
+	}
+	if cfg.PipeEvent == nil {
+		panic("DAFollowerConfig.PipeEvent is required")
+	}
 	dataNs := cfg.DataNamespace
 	if len(dataNs) == 0 {
 		dataNs = cfg.Namespace
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 88 - 106, NewDAFollower
should validate required config fields: check that cfg.Client and cfg.Retriever
are non-nil before constructing the daFollower, returning an error or panicking
early as your codebase convention dictates; update NewDAFollower (and its caller
contract if needed) to validate DAFollowerConfig.Client and
DAFollowerConfig.Retriever and avoid creating a daFollower that will panic later
in runSubscription or fetchAndPipeHeight, ensuring any error message clearly
names the missing field(s) and references the constructor NewDAFollower and the
daFollower struct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 109-120: The catchupLoop may never run if no subscription events
arrive because highestSeenDAHeight is only advanced via updateHighest; to fix,
during daFollower.Start seed highestSeenDAHeight (or call updateHighest) by
querying the latest DA height from the DA client before launching
followLoop/catchupLoop so catchupSignal can be triggered even with no incoming
events; locate Start (method daFollower.Start), highestSeenDAHeight,
updateHighest, catchupSignal and ensure Start obtains the current DA head and
updates/sets highestSeenDAHeight (or invokes updateHighest) and emits the
catchupSignal if needed before starting the goroutines.
- Around line 260-281: The rollback after a failed pipeEvent (and the fallback
when no complete events) unconditionally calls
f.localNextDAHeight.Store(ev.Height), which can regress progress if catchupLoop
advanced it; change both rollbacks to a CAS that only sets ev.Height when the
current value is ev.Height+1 (i.e., call
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert
if this goroutine still owns the +1 advance; do the same replacement for the
other Store(ev.Height) path, and optionally log or ignore the CAS failure
instead of forcing the store; locate these changes around the pipeEvent error
handling and the "No complete events" branch in the followLoop code that
manipulates localNextDAHeight.
- Around line 362-369: The rollback in catchupLoop uses
f.localNextDAHeight.Store(local) which can regress progress if followLoop
advanced the value; change the rollback to only set localNextDAHeight back when
the current value still equals the expected local (use an atomic CompareAndSwap
or conditional load+store): e.g., check f.localNextDAHeight.Load() == local (or
use f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing
the Store, so fetchAndPipeHeight/followLoop races won't regress the counter.

---

Nitpick comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 88-106: NewDAFollower should validate required config fields:
check that cfg.Client and cfg.Retriever are non-nil before constructing the
daFollower, returning an error or panicking early as your codebase convention
dictates; update NewDAFollower (and its caller contract if needed) to validate
DAFollowerConfig.Client and DAFollowerConfig.Retriever and avoid creating a
daFollower that will panic later in runSubscription or fetchAndPipeHeight,
ensuring any error message clearly names the missing field(s) and references the
constructor NewDAFollower and the daFollower struct.

In `@test/testda/dummy.go`:
- Around line 298-311: The height increment d.height.Add(1) should be moved
inside the mutex to match Submit's ordering and avoid racing
GetLatestDAHeight(): acquire d.mu before computing/assigning height (inside the
ticker handler), then update d.headers, call d.notifySubscribers with the new
height, and finally release the lock; ensure you still compute Timestamp now :=
time.Now() as needed and keep notifySubscribers invoked while the lock is held
to preserve ordering consistency with Submit and avoid the window where
GetLatestDAHeight() could observe the new height before the subscription event
is emitted.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d6918e55-cf7b-4700-9ded-4f821cec7349

📥 Commits

Reviewing files that changed from the base of the PR and between 6c1e630 and 896c6ad.

📒 Files selected for processing (3)
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever_strict_test.go
  • test/testda/dummy.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants