Compare commits

...

3 Commits

Author SHA1 Message Date
Serge Zaitsev
3205914a05 implement tags in kv 2025-12-03 17:14:50 +01:00
Serge Zaitsev
3db55d5135 rename storage 2025-12-03 10:06:35 +01:00
Serge Zaitsev
268622abd4 start kv store for annotations 2025-12-02 16:08:44 +01:00
6 changed files with 325 additions and 48 deletions

View File

@@ -0,0 +1,246 @@
package annotation
import (
"bytes"
"context"
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
"github.com/dgraph-io/badger/v4"
"github.com/google/uuid"
annotationV0 "github.com/grafana/grafana/apps/annotation/pkg/apis/annotation/v0alpha1"
)
type kvStore struct {
db *badger.DB
}
func NewKVStore(dbdir string) (Store, error) {
opts := badger.DefaultOptions(dbdir)
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &kvStore{db: db}, nil
}
func (kv *kvStore) Close() error { return kv.db.Close() }
// TODO: namespace!!!!
func keyUUID(id string) []byte { return []byte("a:uuid:" + id) }
func keyTime(t int64, id string) []byte { return []byte(fmt.Sprintf("a:time:%10d:%s", t/1000, id)) }
func keyDash(d string, id string) []byte { return []byte("a:dash:" + d + ":" + id) }
func keyTag(tag, id string) []byte { return []byte("a:tag:" + tag + ":" + id) }
func (kv *kvStore) Get(ctx context.Context, namespace, name string) (*annotationV0.Annotation, error) {
var result *annotationV0.Annotation
err := kv.db.View(func(txn *badger.Txn) error {
// TODO: namespace
a, err := kv.load(txn, name)
if err != nil {
return err
}
result = a
return nil
})
return result, err
}
func (kv *kvStore) List(ctx context.Context, namespace string, opts ListOptions) (*AnnotationList, error) {
result := []annotationV0.Annotation{}
// if *tag != "" {
// prefix := []byte("a:tag:" + *tag + ":")
// db.View(func(txn *badger.Txn) error {
// it := txn.NewIterator(badger.DefaultIteratorOptions)
// for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// parts := bytes.Split(it.Item().Key(), []byte(":"))
// id := string(parts[len(parts)-1])
// a, err := loadUUID(txn, id)
// if err == nil && a.TimeEnd >= from && a.Time <= to {
// if *dash == "" || a.DashboardUID == *dash {
// result = append(result, a)
// }
// }
// }
// it.Close()
// return nil
// })
// } else {
prefix := []byte("a:time:")
fromKey := []byte(fmt.Sprintf("a:time:%020d:", opts.From))
kv.db.View(func(txn *badger.Txn) error {
// TODO: limit
it := txn.NewIterator(badger.DefaultIteratorOptions)
for it.Seek(fromKey); it.ValidForPrefix(prefix); it.Next() {
k := it.Item().Key()
fmt.Println("key", string(k))
parts := bytes.Split(k, []byte(":"))
t, _ := strconv.ParseInt(string(parts[2]), 10, 64)
if t > opts.To {
break
}
id := string(parts[3])
a, err := kv.load(txn, id)
if err == nil {
result = append(result, *a)
}
}
it.Close()
return nil
})
return &AnnotationList{Items: result}, nil
}
func (kv *kvStore) load(txn *badger.Txn, id string) (*annotationV0.Annotation, error) {
var a annotationV0.Annotation
item, e := txn.Get(keyUUID(id))
if e != nil {
return nil, e
}
err := item.Value(func(v []byte) error {
return json.Unmarshal(v, &a.Spec)
})
if err != nil {
return nil, err
}
a.Name = id
return &a, nil
}
func (kv *kvStore) put(k, v []byte) error {
return kv.db.Update(func(txn *badger.Txn) error { return txn.Set(k, v) })
}
func (kv *kvStore) Create(ctx context.Context, a *annotationV0.Annotation) (*annotationV0.Annotation, error) {
b, err := json.Marshal(a.Spec)
if err != nil {
return nil, err
}
// TODO: name, namespace
if a.Name == "" {
a.Name = uuid.New().String()
}
fmt.Println("name", a.Name)
if err := kv.put(keyUUID(a.Name), b); err != nil {
return nil, err
}
if err := kv.put(keyTime(a.Spec.Time, a.Name), []byte{}); err != nil {
return nil, err
}
if a.Spec.DashboardUID != nil {
if err := kv.put(keyDash(*a.Spec.DashboardUID, a.Name), []byte{}); err != nil {
return nil, err
}
}
for _, t := range a.Spec.Tags {
if err := kv.put(keyTag(t, a.Name), []byte{}); err != nil {
return nil, err
}
}
return a, nil
}
func (kv *kvStore) Update(ctx context.Context, a *annotationV0.Annotation) error {
b, err := json.Marshal(a.Spec)
if err != nil {
return err
}
return kv.db.Update(func(txn *badger.Txn) error {
old, err := kv.load(txn, a.Name)
if err != nil {
return err
}
slices.Sort(a.Spec.Tags)
slices.Sort(old.Spec.Tags)
tagsChanged := false
if len(a.Spec.Tags) != len(old.Spec.Tags) {
tagsChanged = true
} else {
for i := range a.Spec.Tags {
if a.Spec.Tags[i] != old.Spec.Tags[i] {
tagsChanged = true
break
}
}
}
if tagsChanged {
for _, t := range old.Spec.Tags {
txn.Delete(keyTag(t, a.Name))
}
for _, t := range a.Spec.Tags {
txn.Set(keyTag(t, a.Name), []byte{})
}
}
return txn.Set(keyUUID(a.Name), b)
})
}
func (kv *kvStore) Delete(ctx context.Context, namespace, name string) error {
a, err := kv.Get(ctx, namespace, name)
if err != nil {
return err
}
return kv.db.Update(func(txn *badger.Txn) error {
if err := txn.Delete(keyUUID(a.Name)); err != nil {
return err
}
if err := txn.Delete(keyTime(a.Spec.Time, a.Name)); err != nil {
return err
}
if a.Spec.DashboardUID != nil {
if err := txn.Delete(keyDash(*a.Spec.DashboardUID, a.Name)); err != nil {
return err
}
}
for _, t := range a.Spec.Tags {
txn.Delete(keyTag(t, a.Name))
}
return nil
})
}
func (kv *kvStore) Tags(ctx context.Context, namespace string, opts TagListOptions) ([]Tag, error) {
tagCounts := make(map[string]int64)
prefix := []byte("a:tag:")
err := kv.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
k := it.Item().Key()
fmt.Println("tag key", string(k))
parts := bytes.Split(k, []byte(":"))
if len(parts) < 4 {
continue
}
tag := string(parts[2])
if opts.Prefix == "" || strings.HasPrefix(tag, opts.Prefix) {
tagCounts[tag]++
}
}
it.Close()
return nil
})
if err != nil {
return nil, err
}
tags := make([]Tag, 0, len(tagCounts))
for name, count := range tagCounts {
tags = append(tags, Tag{Name: name, Count: count})
}
// TODO: sort tags by count or name?
if opts.Limit > 0 && len(tags) > opts.Limit {
tags = tags[:opts.Limit]
}
return tags, nil
}

View File

@@ -98,20 +98,20 @@ func (m *memoryStore) Create(ctx context.Context, anno *annotationV0.Annotation)
return created, nil
}
func (m *memoryStore) Update(ctx context.Context, anno *annotationV0.Annotation) (*annotationV0.Annotation, error) {
func (m *memoryStore) Update(ctx context.Context, anno *annotationV0.Annotation) error {
m.mu.Lock()
defer m.mu.Unlock()
key := anno.Namespace + "/" + anno.Name
if _, exists := m.data[key]; !exists {
return nil, fmt.Errorf("annotation not found")
return fmt.Errorf("annotation not found")
}
updated := anno.DeepCopy()
m.data[key] = updated
return updated, nil
return nil
}
func (m *memoryStore) Delete(ctx context.Context, namespace, name string) error {
@@ -156,3 +156,7 @@ func (m *memoryStore) ListTags(ctx context.Context, namespace string, opts TagLi
return tags, nil
}
func (m *memoryStore) Tags(ctx context.Context, namespace string, opts TagListOptions) ([]Tag, error) {
return nil, fmt.Errorf("not implemented")
}

View File

@@ -2,11 +2,12 @@ package annotation
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -39,7 +40,7 @@ var (
type AnnotationAppInstaller struct {
appsdkapiserver.AppInstaller
cfg *setting.Cfg
legacy *legacyStorage
legacy *restStorage
}
func RegisterAppInstaller(
@@ -55,13 +56,18 @@ func RegisterAppInstaller(
var tagHandler func(context.Context, app.CustomRouteResponseWriter, *app.CustomRouteRequest) error
if service != nil {
mapper := grafrequest.GetNamespaceMapper(cfg)
sqlAdapter := NewSQLAdapter(service, cleaner, mapper, cfg)
installer.legacy = &legacyStorage{
store: sqlAdapter,
kvAdapter, err := NewKVStore("kv")
if err != nil {
return nil, err
}
// sqlAdapter := NewSQLAdapter(service, cleaner, mapper, cfg)
installer.legacy = &restStorage{
// store: sqlAdapter,
store: kvAdapter,
mapper: mapper,
}
// Create the tags handler using the sqlAdapter as TagProvider
tagHandler = newTagsHandler(sqlAdapter)
tagHandler = newTagsHandler(installer.legacy.store)
}
provider := simple.NewAppProvider(apis.LocalManifest(), nil, annotationapp.New)
@@ -116,47 +122,52 @@ func (a *AnnotationAppInstaller) GetLegacyStorage(requested schema.GroupVersionR
}
var (
_ rest.Scoper = (*legacyStorage)(nil)
_ rest.SingularNameProvider = (*legacyStorage)(nil)
_ rest.Getter = (*legacyStorage)(nil)
_ rest.Storage = (*legacyStorage)(nil)
_ rest.Creater = (*legacyStorage)(nil)
_ rest.Updater = (*legacyStorage)(nil)
_ rest.GracefulDeleter = (*legacyStorage)(nil)
_ rest.Scoper = (*restStorage)(nil)
_ rest.SingularNameProvider = (*restStorage)(nil)
_ rest.Getter = (*restStorage)(nil)
_ rest.Storage = (*restStorage)(nil)
_ rest.Creater = (*restStorage)(nil)
_ rest.Updater = (*restStorage)(nil)
_ rest.GracefulDeleter = (*restStorage)(nil)
)
type legacyStorage struct {
type restStorage struct {
store Store
mapper grafrequest.NamespaceMapper
tableConverter rest.TableConvertor
}
func (s *legacyStorage) New() runtime.Object {
func (s *restStorage) New() runtime.Object {
return annotationV0.AnnotationKind().ZeroValue()
}
func (s *legacyStorage) Destroy() {}
func (s *restStorage) Destroy() {}
func (s *legacyStorage) NamespaceScoped() bool {
func (s *restStorage) NamespaceScoped() bool {
return true // namespace == org
}
func (s *legacyStorage) GetSingularName() string {
func (s *restStorage) GetSingularName() string {
return strings.ToLower(annotationV0.AnnotationKind().Kind())
}
func (s *legacyStorage) NewList() runtime.Object {
func (s *restStorage) NewList() runtime.Object {
return annotationV0.AnnotationKind().ZeroListValue()
}
func (s *legacyStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
func (s *restStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return s.tableConverter.ConvertToTable(ctx, object, tableOptions)
}
func (s *legacyStorage) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
// TODO: hierarchy of list options (where do tags fit in - seems to be mutually exclusive with dashboard query?)
// 1. No options: list all for namespace
// 2. By dashboard: list all annotations for namespace+dashboard (i.e. remove by dashboard)
// 3. By panel: list all annotations for namespace+dashboard+panel (i.e. remove by dashboard+panel)
// 4. By time range: list all annotations for namespace in time range
func (s *restStorage) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
namespace := request.NamespaceValue(ctx)
opts := ListOptions{}
opts := ListOptions{To: time.Now().UnixMilli()}
if options.FieldSelector != nil {
for _, r := range options.FieldSelector.Requirements() {
switch r.Field {
@@ -232,12 +243,12 @@ func (s *legacyStorage) List(ctx context.Context, options *internalversion.ListO
return &annotationV0.AnnotationList{Items: result.Items}, nil
}
func (s *legacyStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
func (s *restStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
namespace := request.NamespaceValue(ctx)
return s.store.Get(ctx, namespace, name)
}
func (s *legacyStorage) Create(ctx context.Context,
func (s *restStorage) Create(ctx context.Context,
obj runtime.Object,
createValidation rest.ValidateObjectFunc,
options *metav1.CreateOptions,
@@ -249,7 +260,7 @@ func (s *legacyStorage) Create(ctx context.Context,
return s.store.Create(ctx, resource)
}
func (s *legacyStorage) Update(ctx context.Context,
func (s *restStorage) Update(ctx context.Context,
name string,
objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc,
@@ -257,15 +268,34 @@ func (s *legacyStorage) Update(ctx context.Context,
forceAllowCreate bool,
options *metav1.UpdateOptions,
) (runtime.Object, bool, error) {
return nil, false, errors.New("not implemented")
old, err := s.Get(ctx, name, nil)
if err != nil {
return nil, false, err
}
obj, err := objInfo.UpdatedObject(ctx, old)
if err != nil {
return old, false, err
}
newObj, ok := obj.(*annotationV0.Annotation)
if !ok {
return nil, false, k8serrors.NewBadRequest("expected valid annotation")
}
if updateValidation != nil {
if err := updateValidation(ctx, obj, old); err != nil {
return nil, false, err
}
}
// TODO: validate that only name/tags are modified
err = s.store.Update(ctx, newObj)
return obj, false, err
}
func (s *legacyStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
func (s *restStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
namespace := request.NamespaceValue(ctx)
err := s.store.Delete(ctx, namespace, name)
return nil, false, err
}
func (s *legacyStorage) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *internalversion.ListOptions) (runtime.Object, error) {
func (s *restStorage) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *internalversion.ListOptions) (runtime.Object, error) {
return nil, fmt.Errorf("DeleteCollection for annotation is not available")
}

View File

@@ -125,20 +125,20 @@ func (a *sqlAdapter) Create(ctx context.Context, anno *annotationV0.Annotation)
return created, nil
}
func (a *sqlAdapter) Update(ctx context.Context, anno *annotationV0.Annotation) (*annotationV0.Annotation, error) {
func (a *sqlAdapter) Update(ctx context.Context, anno *annotationV0.Annotation) error {
orgID, err := namespaceToOrgID(ctx, anno.Namespace)
if err != nil {
return nil, err
return err
}
item := a.fromK8sResource(anno)
item.OrgID = orgID
if err := a.repo.Update(ctx, item); err != nil {
return nil, err
return err
}
return anno, nil
return nil
}
func (a *sqlAdapter) Delete(ctx context.Context, namespace, name string) error {

View File

@@ -9,9 +9,12 @@ import (
type Store interface {
Get(ctx context.Context, namespace, name string) (*annotationV0.Annotation, error)
List(ctx context.Context, namespace string, opts ListOptions) (*AnnotationList, error)
// TODO: return id only?
Create(ctx context.Context, annotation *annotationV0.Annotation) (*annotationV0.Annotation, error)
Update(ctx context.Context, annotation *annotationV0.Annotation) (*annotationV0.Annotation, error)
Update(ctx context.Context, annotation *annotationV0.Annotation) error
Delete(ctx context.Context, namespace, name string) error
Tags(ctx context.Context, namespace string, opts TagListOptions) ([]Tag, error)
}
type ListOptions struct {
@@ -22,26 +25,20 @@ type ListOptions struct {
Limit int64
Continue string
}
type AnnotationList struct {
Items []annotationV0.Annotation
Continue string
}
type LifecycleManager interface {
Cleanup(ctx context.Context) (int64, error)
}
type TagProvider interface {
ListTags(ctx context.Context, namespace string, opts TagListOptions) ([]Tag, error)
}
type TagListOptions struct {
Prefix string
Limit int
}
type Tag struct {
Name string
Count int64
}
type LifecycleManager interface {
Cleanup(ctx context.Context) (int64, error)
}

View File

@@ -17,14 +17,14 @@ type tagItem struct {
Count int64 `json:"count"`
}
func newTagsHandler(tagProvider TagProvider) func(ctx context.Context, writer app.CustomRouteResponseWriter, request *app.CustomRouteRequest) error {
func newTagsHandler(tagProvider Store) func(ctx context.Context, writer app.CustomRouteResponseWriter, request *app.CustomRouteRequest) error {
return func(ctx context.Context, writer app.CustomRouteResponseWriter, request *app.CustomRouteRequest) error {
fmt.Println("Handling /tags request")
namespace := request.ResourceIdentifier.Namespace
if namespace == "" {
namespace = "default"
}
tags, err := tagProvider.ListTags(ctx, namespace, TagListOptions{})
tags, err := tagProvider.Tags(ctx, namespace, TagListOptions{})
if err != nil {
return err
}