Compare commits

...

1 Commits

Author SHA1 Message Date
Georges Chaudy
95ecb304ad Enhancement: Add watch poll interval to KV storage backend
* Introduced a new `WatchPollInterval` option in `KVBackendOptions` to configure the polling frequency for new events.
* Updated the `NewKVStorageBackend` function to utilize the specified `WatchPollInterval`, defaulting to a predefined value if not set.
* Adjusted the event watching mechanism to respect the configured polling interval.
2025-11-25 14:53:59 +01:00
2 changed files with 12 additions and 1 deletions

View File

@@ -63,6 +63,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
backend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{
KvStore: kv,
WithExperimentalClusterScope: true,
WatchPollInterval: 5 * time.Minute,
})
if err != nil {
return nil, err

View File

@@ -62,6 +62,7 @@ type kvStorageBackend struct {
withPruner bool
eventRetentionPeriod time.Duration
eventPruningInterval time.Duration
watchPollInterval time.Duration
historyPruner Pruner
withExperimentalClusterScope bool
//tracer trace.Tracer
@@ -76,6 +77,7 @@ type KVBackendOptions struct {
WithExperimentalClusterScope bool // Allow empty namespace to be used for cluster-scoped resources.
EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour)
EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes)
WatchPollInterval time.Duration // How often to poll for new events in watch (default: 100ms)
Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics
}
@@ -100,6 +102,11 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
eventPruningInterval = defaultEventPruningInterval
}
watchPollInterval := opts.WatchPollInterval
if watchPollInterval <= 0 {
watchPollInterval = defaultPollInterval
}
backend := &kvStorageBackend{
kv: kv,
dataStore: newDataStore(kv),
@@ -110,6 +117,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
log: &logging.NoOpLogger{}, // Make this configurable
eventRetentionPeriod: eventRetentionPeriod,
eventPruningInterval: eventPruningInterval,
watchPollInterval: watchPollInterval,
withExperimentalClusterScope: opts.WithExperimentalClusterScope,
}
err = backend.initPruner(ctx)
@@ -1170,7 +1178,9 @@ func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *Writte
// Create a channel to receive events
events := make(chan *WrittenEvent, 10000) // TODO: make this configurable
notifierEvents := k.notifier.Watch(ctx, defaultWatchOptions())
watchOpts := defaultWatchOptions()
watchOpts.PollInterval = k.watchPollInterval
notifierEvents := k.notifier.Watch(ctx, watchOpts)
go func() {
for event := range notifierEvents {
// fetch the data