Compare commits

...

1 Commits

Author SHA1 Message Date
Georges Chaudy
95ecb304ad Enhancement: Add watch poll interval to KV storage backend
* Introduced a new `WatchPollInterval` option in `KVBackendOptions` to configure the polling frequency for new events.
* Updated the `NewKVStorageBackend` function to utilize the specified `WatchPollInterval`, defaulting to a predefined value if not set.
* Adjusted the event watching mechanism to respect the configured polling interval.
2025-11-25 14:53:59 +01:00
2 changed files with 12 additions and 1 deletions

View File

@@ -63,6 +63,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{ backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
KvStore: kv, KvStore: kv,
WithExperimentalClusterScope: true, WithExperimentalClusterScope: true,
WatchPollInterval: 5 * time.Minute,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -62,6 +62,7 @@ type kvStorageBackend struct {
withPruner bool withPruner bool
eventRetentionPeriod time.Duration eventRetentionPeriod time.Duration
eventPruningInterval time.Duration eventPruningInterval time.Duration
watchPollInterval time.Duration
historyPruner Pruner historyPruner Pruner
withExperimentalClusterScope bool withExperimentalClusterScope bool
//tracer trace.Tracer //tracer trace.Tracer
@@ -76,6 +77,7 @@ type KVBackendOptions struct {
WithExperimentalClusterScope bool // Allow empty namespace to be used for cluster-scoped resources. WithExperimentalClusterScope bool // Allow empty namespace to be used for cluster-scoped resources.
EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour) EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour)
EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes) EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes)
WatchPollInterval time.Duration // How often to poll for new events in watch (default: 100ms)
Tracer trace.Tracer // TODO add tracing Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics Reg prometheus.Registerer // TODO add metrics
} }
@@ -100,6 +102,11 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
eventPruningInterval = defaultEventPruningInterval eventPruningInterval = defaultEventPruningInterval
} }
watchPollInterval := opts.WatchPollInterval
if watchPollInterval <= 0 {
watchPollInterval = defaultPollInterval
}
backend := &kvStorageBackend{ backend := &kvStorageBackend{
kv: kv, kv: kv,
dataStore: newDataStore(kv), dataStore: newDataStore(kv),
@@ -110,6 +117,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
log: &logging.NoOpLogger{}, // Make this configurable log: &logging.NoOpLogger{}, // Make this configurable
eventRetentionPeriod: eventRetentionPeriod, eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval, eventPruningInterval: eventPruningInterval,
watchPollInterval: watchPollInterval,
withExperimentalClusterScope: opts.WithExperimentalClusterScope, withExperimentalClusterScope: opts.WithExperimentalClusterScope,
} }
err = backend.initPruner(ctx) err = backend.initPruner(ctx)
@@ -1170,7 +1178,9 @@ func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *Writte
// Create a channel to receive events // Create a channel to receive events
events := make(chan *WrittenEvent, 10000) // TODO: make this configurable events := make(chan *WrittenEvent, 10000) // TODO: make this configurable
notifierEvents := k.notifier.Watch(ctx, defaultWatchOptions()) watchOpts := defaultWatchOptions()
watchOpts.PollInterval = k.watchPollInterval
notifierEvents := k.notifier.Watch(ctx, watchOpts)
go func() { go func() {
for event := range notifierEvents { for event := range notifierEvents {
// fetch the data // fetch the data