Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/monitor/monitorapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ const (
AnnotationPriority AnnotationKey = "priority"
AnnotationPreviousPriority AnnotationKey = "prev-priority"
AnnotationVIP AnnotationKey = "vip"
// AnnotationBatchable marks intervals that can be batched together when there are
// many duplicates. Used by etcd log intervals that can explode in volume.
AnnotationBatchable AnnotationKey = "batchable"
)

// ConstructionOwner was originally meant to signify that an interval was derived from other intervals.
Expand Down
121 changes: 121 additions & 0 deletions pkg/monitortests/etcd/etcdloganalyzer/batch_intervals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package etcdloganalyzer

import (
"fmt"
"sort"
"time"

"github.com/openshift/origin/pkg/monitor/monitorapi"
)

// BatchEtcdLogIntervals batches EtcdLog intervals into 1-minute increments,
// grouped by locator and humanMessage. The count annotation tracks how many
// duplicate intervals were batched together.
// Only intervals marked with the batchable annotation are batched; leadership-related
// intervals are left unchanged to avoid breaking other parts of the code.
func BatchEtcdLogIntervals(intervals monitorapi.Intervals) monitorapi.Intervals {
// Filter to only EtcdLog intervals that are marked as batchable
etcdIntervals := intervals.Filter(func(i monitorapi.Interval) bool {
return i.Source == monitorapi.SourceEtcdLog &&
i.Message.Annotations[monitorapi.AnnotationBatchable] == "true"
})

if len(etcdIntervals) == 0 {
return monitorapi.Intervals{}
}

// Sort by time first
sort.Sort(etcdIntervals)

// batchKey groups intervals by locator + humanMessage + minute bucket
type batchKey struct {
locator string
humanMessage string
minuteBucket time.Time
}

// batchedInterval holds the aggregated data for a batch
type batchedInterval struct {
interval monitorapi.Interval
count int
}

batches := make(map[batchKey]*batchedInterval)
var batchOrder []batchKey // preserve order for deterministic output

for _, interval := range etcdIntervals {
// Truncate to minute for bucketing
minuteBucket := interval.From.Truncate(time.Minute)

key := batchKey{
locator: interval.Locator.OldLocator(),
humanMessage: interval.Message.HumanMessage,
minuteBucket: minuteBucket,
}

if existing, ok := batches[key]; ok {
existing.count++
// Extend the interval to cover the full minute or to the latest event
if interval.To.After(existing.interval.To) {
existing.interval.To = interval.To
}
} else {
// Create a new batch entry
// Set the from time to the start of the minute bucket
// Set the to time to the end of the minute bucket (or interval.To if later)
batchFrom := minuteBucket
batchTo := minuteBucket.Add(time.Minute)
if interval.To.After(batchTo) {
batchTo = interval.To
}

// Copy the interval and modify the times
batchedInt := interval
batchedInt.From = batchFrom
batchedInt.To = batchTo

batches[key] = &batchedInterval{
interval: batchedInt,
count: 1,
}
batchOrder = append(batchOrder, key)
}
}

// Convert batches back to intervals, adding count annotation
result := make(monitorapi.Intervals, 0, len(batches))
for _, key := range batchOrder {
batch := batches[key]

// Create a new message with the count annotation, removing the batchable marker
newAnnotations := make(map[monitorapi.AnnotationKey]string)
for k, v := range batch.interval.Message.Annotations {
newAnnotations[k] = v
}
delete(newAnnotations, monitorapi.AnnotationBatchable)
newAnnotations[monitorapi.AnnotationCount] = fmt.Sprintf("%d", batch.count)

resultInterval := monitorapi.Interval{
Condition: monitorapi.Condition{
Level: batch.interval.Level,
Locator: batch.interval.Locator,
Message: monitorapi.Message{
Reason: batch.interval.Message.Reason,
Cause: batch.interval.Message.Cause,
HumanMessage: batch.interval.Message.HumanMessage,
Annotations: newAnnotations,
},
},
Source: batch.interval.Source,
Display: batch.interval.Display,
From: batch.interval.From,
To: batch.interval.To,
}
result = append(result, resultInterval)
}

// Sort by time for consistent output
sort.Sort(result)

return result
}
190 changes: 190 additions & 0 deletions pkg/monitortests/etcd/etcdloganalyzer/batch_intervals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package etcdloganalyzer

import (
_ "embed"
"strconv"
"testing"

"github.com/openshift/origin/pkg/monitor/monitorapi"
monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"
)

//go:embed testdata/etcdlog_test_intervals.json
var testIntervalsJSON []byte

func TestBatchEtcdLogIntervals(t *testing.T) {
// Parse the embedded test data
allIntervals, err := monitorserialization.IntervalsFromJSON(testIntervalsJSON)
if err != nil {
t.Fatalf("Failed to parse embedded test intervals: %v", err)
}

// Filter to only batchable EtcdLog intervals and count them
batchableIntervals := allIntervals.Filter(func(i monitorapi.Interval) bool {
return i.Source == monitorapi.SourceEtcdLog &&
i.Message.Annotations[monitorapi.AnnotationBatchable] == "true"
})
originalCount := len(batchableIntervals)
t.Logf("Original batchable EtcdLog interval count: %d", originalCount)

// Count non-batchable EtcdLog intervals (like leadership events)
nonBatchableEtcdIntervals := allIntervals.Filter(func(i monitorapi.Interval) bool {
return i.Source == monitorapi.SourceEtcdLog &&
i.Message.Annotations[monitorapi.AnnotationBatchable] != "true"
})
t.Logf("Non-batchable EtcdLog intervals (should be unchanged): %d", len(nonBatchableEtcdIntervals))

if originalCount == 0 {
t.Fatal("No batchable EtcdLog intervals found in test data")
}

// Apply the batching algorithm
batchedIntervals := BatchEtcdLogIntervals(allIntervals)
batchedCount := len(batchedIntervals)
t.Logf("Batched interval count: %d", batchedCount)

// Tally up the counts from the batched intervals
var talliedCount int
for _, interval := range batchedIntervals {
countStr, ok := interval.Message.Annotations[monitorapi.AnnotationCount]
if !ok {
t.Errorf("Batched interval missing count annotation: %v", interval)
talliedCount++
continue
}
count, err := strconv.Atoi(countStr)
if err != nil {
t.Errorf("Invalid count annotation %q: %v", countStr, err)
talliedCount++
continue
}
talliedCount += count
}

t.Logf("Tallied count from batched intervals: %d", talliedCount)

// Verify the tallied count matches the original batchable count
if talliedCount != originalCount {
t.Errorf("Count mismatch: original batchable=%d, tallied=%d", originalCount, talliedCount)
}

// Verify all batched intervals are EtcdLog source
for _, interval := range batchedIntervals {
if interval.Source != monitorapi.SourceEtcdLog {
t.Errorf("Batched interval has wrong source: got %s, want EtcdLog", interval.Source)
}
}

// Verify the batchable annotation is removed from all batched intervals
for _, interval := range batchedIntervals {
if _, hasBatchable := interval.Message.Annotations[monitorapi.AnnotationBatchable]; hasBatchable {
t.Errorf("Batched interval still has batchable annotation: %v", interval)
}
}

// Verify batched intervals have proper 1-minute aligned times
for _, interval := range batchedIntervals {
if interval.From.Second() != 0 || interval.From.Nanosecond() != 0 {
t.Errorf("Batched interval From time not aligned to minute boundary: %v", interval.From)
}
}

// Verify compression happened (we should have fewer batched intervals than original)
if batchedCount >= originalCount {
t.Errorf("Expected compression: batchedCount=%d should be less than originalCount=%d", batchedCount, originalCount)
}

// Test the expected batching based on our test data (only batchable intervals):
// - node-1, "apply request took too long", minute 10:51 -> 3 intervals batched
// - node-1, "apply request took too long", minute 10:52 -> 2 intervals batched
// - node-1, "slow fdatasync", minute 10:51 -> 2 intervals batched
// - node-2, "apply request took too long", minute 10:51 -> 2 intervals batched
// - node-2, "waiting for ReadIndex...", minute 10:51 -> 1 interval
// - node-3, "apply request took too long", minute 10:51 -> 1 interval
// Note: "restarting local member" is NOT batchable (no batchable annotation)
// Total: 6 batched intervals from 11 original batchable EtcdLog intervals

expectedBatchedCount := 6
if batchedCount != expectedBatchedCount {
t.Errorf("Expected %d batched intervals, got %d", expectedBatchedCount, batchedCount)
}

// Verify specific batch counts by building a map
type batchKey struct {
node string
humanMessage string
minute int
}
expectedCounts := map[batchKey]int{
{"node-1", "apply request took too long", 51}: 3,
{"node-1", "apply request took too long", 52}: 2,
{"node-1", "slow fdatasync", 51}: 2,
{"node-2", "apply request took too long", 51}: 2,
{"node-2", "waiting for ReadIndex response took too long, retrying", 51}: 1,
{"node-3", "apply request took too long", 51}: 1,
}

for _, interval := range batchedIntervals {
node := interval.Locator.Keys[monitorapi.LocatorNodeKey]
minute := interval.From.Minute()
key := batchKey{node, interval.Message.HumanMessage, minute}

expectedCount, ok := expectedCounts[key]
if !ok {
t.Errorf("Unexpected batch: node=%s, message=%s, minute=%d", node, interval.Message.HumanMessage, minute)
continue
}

actualCount, _ := strconv.Atoi(interval.Message.Annotations[monitorapi.AnnotationCount])
if actualCount != expectedCount {
t.Errorf("Wrong count for batch (node=%s, message=%s, minute=%d): got %d, want %d",
node, interval.Message.HumanMessage, minute, actualCount, expectedCount)
}
}
}

func TestBatchEtcdLogIntervals_EmptyInput(t *testing.T) {
result := BatchEtcdLogIntervals(monitorapi.Intervals{})
if len(result) != 0 {
t.Errorf("Expected empty result for empty input, got %d intervals", len(result))
}
}

func TestBatchEtcdLogIntervals_NoEtcdLogIntervals(t *testing.T) {
// Create intervals with a different source
intervals := monitorapi.Intervals{
{
Condition: monitorapi.Condition{
Level: monitorapi.Info,
},
Source: monitorapi.SourcePodMonitor,
},
}

result := BatchEtcdLogIntervals(intervals)
if len(result) != 0 {
t.Errorf("Expected empty result for non-EtcdLog intervals, got %d intervals", len(result))
}
}

func TestBatchEtcdLogIntervals_NonBatchableIntervalsIgnored(t *testing.T) {
// Create EtcdLog intervals without the batchable annotation (like leadership events)
intervals := monitorapi.Intervals{
{
Condition: monitorapi.Condition{
Level: monitorapi.Warning,
Message: monitorapi.Message{
Reason: "LeaderElected",
HumanMessage: "became leader at term 5",
Annotations: map[monitorapi.AnnotationKey]string{},
},
},
Source: monitorapi.SourceEtcdLog,
},
}

result := BatchEtcdLogIntervals(intervals)
if len(result) != 0 {
t.Errorf("Expected empty result for non-batchable EtcdLog intervals, got %d intervals", len(result))
}
}
15 changes: 11 additions & 4 deletions pkg/monitortests/etcd/etcdloganalyzer/monitortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"regexp"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/test/e2e/framework"
Expand Down Expand Up @@ -144,6 +145,11 @@ func (w *etcdLogAnalyzer) ConstructComputedIntervals(ctx context.Context, starti
newInterval = nil
}

// Batch the high-volume etcd log intervals (slow fdatasync, apply request took too long, etc.)
// to reduce the number of intervals that need to be processed and displayed.
batchedEtcdLogIntervals := BatchEtcdLogIntervals(startingIntervals)
ret = append(ret, batchedEtcdLogIntervals...)

return ret, nil
}

Expand Down Expand Up @@ -235,6 +241,7 @@ func (g etcdRecorder) HandleLogLine(logLine podaccess.LogLineContent) {
Locator(logLine.Locator).
Message(
monitorapi.NewMessage().
WithAnnotation(monitorapi.AnnotationBatchable, "true").
HumanMessage(parsedLine.Msg),
).
Display().
Expand Down
Loading