mirror of
https://github.com/grafana/grafana.git
synced 2025-12-21 12:04:45 +08:00
Compare commits
3 Commits
docs/add-t
...
undef1nd/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65f6aa36c7 | ||
|
|
999e4326dc | ||
|
|
acfa9cad43 |
205
pkg/loki-annotations/main.go
Normal file
205
pkg/loki-annotations/main.go
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var (
|
||||||
|
lokiURL = flag.String("loki-url", "http://localhost:3100", "Loki URL")
|
||||||
|
//basicAuthUser = flag.String("basic-auth-user", "", "Basic auth username")
|
||||||
|
//basicAuthPassword = flag.String("basic-auth-password", "", "Basic auth password")
|
||||||
|
|
||||||
|
text = flag.String("text", "Test annotation", "Annotation text")
|
||||||
|
tags = flag.String("tags", "", "Comma-separated tags")
|
||||||
|
orgID = flag.Int64("org-id", 1, "Organization ID")
|
||||||
|
dashboardUID = flag.String("dashboard-uid", "xxx-yyy-zzz", "Dashboard UID")
|
||||||
|
panelID = flag.Int64("panel-id", 1, "Panel ID")
|
||||||
|
annotationID = flag.Int64("annotation-id", 0, "Annotation ID (for get/update/delete)")
|
||||||
|
limit = flag.Int64("limit", 100, "Query limit")
|
||||||
|
)
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
args := flag.Args()
|
||||||
|
if len(args) == 0 {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error: action is required\n")
|
||||||
|
fmt.Fprintf(os.Stderr, "Usage: %s <action> [flags]\n", os.Args[0])
|
||||||
|
fmt.Fprintf(os.Stderr, "Available actions: create, get, list-tags, update, delete\n")
|
||||||
|
flag.Usage()
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
action := args[0]
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
URL: *lokiURL,
|
||||||
|
//BasicAuthUser: *basicAuthUser,
|
||||||
|
//BasicAuthPassword: *basicAuthPassword,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create store
|
||||||
|
store, err := NewStore(cfg)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error creating store: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
switch action {
|
||||||
|
case "create":
|
||||||
|
id, err := addAnnotation(ctx, store, *orgID, *text, *dashboardUID, *panelID, *tags)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error creating annotation: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Printf("✓ Annotation created with ID: %d\n", id)
|
||||||
|
|
||||||
|
case "get":
|
||||||
|
anns, err := getAnnotations(ctx, store, *orgID, *dashboardUID, *panelID, *annotationID, *limit)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error getting annotations: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Printf("✓ Found %d annotation(s):\n", len(anns))
|
||||||
|
for _, ann := range anns {
|
||||||
|
fmt.Printf(" ID: %d, Text: %s, Time: %s, Tags: %v\n",
|
||||||
|
ann.ID, ann.Text, time.Unix(0, ann.Time*1e6).Format(time.RFC3339), ann.Tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
case "list-tags":
|
||||||
|
tagsResult, err := getTags(ctx, store, *orgID, *tags)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error listing tags: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Printf("✓ Found %d tag(s):\n", len(tagsResult.Tags))
|
||||||
|
for _, tag := range tagsResult.Tags {
|
||||||
|
fmt.Printf(" %s: %d\n", tag.Tag, tag.Count)
|
||||||
|
}
|
||||||
|
|
||||||
|
case "update":
|
||||||
|
if err := updateAnnotation(ctx, store, *orgID, *annotationID, *text, *tags); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error updating annotation: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Printf("✓ Annotation %d updated\n", *annotationID)
|
||||||
|
|
||||||
|
case "delete":
|
||||||
|
if err := deleteAnnotation(ctx, store, *orgID, *annotationID); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error deleting annotation: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Printf("✓ Annotation %d deleted\n", *annotationID)
|
||||||
|
|
||||||
|
default:
|
||||||
|
fmt.Fprintf(os.Stderr, "Error: unknown action: %s\n", action)
|
||||||
|
fmt.Fprintf(os.Stderr, "Available actions: create, get, list-tags, update, delete\n")
|
||||||
|
flag.Usage()
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addAnnotation(ctx context.Context, store *Store, orgID int64, text, dashboardUID string, panelID int64, tagsStr string) (int64, error) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
ann := &Annotation{
|
||||||
|
OrgID: orgID,
|
||||||
|
UserID: 1,
|
||||||
|
DashboardUID: dashboardUID,
|
||||||
|
PanelID: panelID,
|
||||||
|
Text: text,
|
||||||
|
Time: now,
|
||||||
|
Created: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagsStr != "" {
|
||||||
|
tags := []string{}
|
||||||
|
for _, tag := range splitTags(tagsStr) {
|
||||||
|
if tag != "" {
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ann.Tags = tags
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.Add(ctx, ann); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ann.ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAnnotations(ctx context.Context, store *Store, orgID int64, dashboardUID string, panelID, annotationID, limit int64) ([]*Annotation, error) {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
thirtyDaysAgo := now - int64(30*24*time.Hour.Milliseconds())
|
||||||
|
|
||||||
|
query := Query{
|
||||||
|
OrgID: orgID,
|
||||||
|
From: thirtyDaysAgo,
|
||||||
|
To: now,
|
||||||
|
DashboardUID: dashboardUID,
|
||||||
|
PanelID: panelID,
|
||||||
|
AnnotationID: annotationID,
|
||||||
|
Limit: limit,
|
||||||
|
}
|
||||||
|
|
||||||
|
return store.Get(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTags(ctx context.Context, store *Store, orgID int64, tagFilter string) (TagsResult, error) {
|
||||||
|
query := TagsQuery{
|
||||||
|
OrgID: orgID,
|
||||||
|
Tag: tagFilter,
|
||||||
|
Limit: 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
return store.GetTags(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateAnnotation(ctx context.Context, store *Store, orgID, annotationID int64, text, tagsStr string) error {
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
ann := &Annotation{
|
||||||
|
ID: annotationID,
|
||||||
|
OrgID: orgID,
|
||||||
|
Text: text,
|
||||||
|
Time: now,
|
||||||
|
Created: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagsStr != "" {
|
||||||
|
tags := []string{}
|
||||||
|
for _, tag := range splitTags(tagsStr) {
|
||||||
|
if tag != "" {
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ann.Tags = tags
|
||||||
|
}
|
||||||
|
|
||||||
|
return store.Update(ctx, ann)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteAnnotation(ctx context.Context, store *Store, orgID, annotationID int64) error {
|
||||||
|
params := &DeleteParams{
|
||||||
|
OrgID: orgID,
|
||||||
|
ID: annotationID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return store.Delete(ctx, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitTags(tagsStr string) []string {
|
||||||
|
tags := []string{}
|
||||||
|
for _, tag := range strings.Split(tagsStr, ",") {
|
||||||
|
tag = strings.TrimSpace(tag)
|
||||||
|
if tag != "" {
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tags
|
||||||
|
}
|
||||||
126
pkg/loki-annotations/merge.go
Normal file
126
pkg/loki-annotations/merge.go
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Merger handles merging annotations with changes
|
||||||
|
type Merger struct{}
|
||||||
|
|
||||||
|
// NewMerger creates a new merger
|
||||||
|
func NewMerger() *Merger {
|
||||||
|
return &Merger{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge merges annotations with changes, applying the latest change to each annotation
|
||||||
|
func (m *Merger) Merge(
|
||||||
|
annEntries []*AnnotationEntry,
|
||||||
|
changes []*ChangeEntry,
|
||||||
|
query Query,
|
||||||
|
) []*Annotation {
|
||||||
|
// If no changes, convert annotations directly
|
||||||
|
if len(changes) == 0 {
|
||||||
|
return m.annotationsToDTOs(annEntries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group changes by annotation ID, keeping only the latest change for each
|
||||||
|
changesByID := make(map[string]*ChangeEntry)
|
||||||
|
for _, change := range changes {
|
||||||
|
existing, ok := changesByID[change.AnnotationID]
|
||||||
|
if !ok || change.Created > existing.Created {
|
||||||
|
changesByID[change.AnnotationID] = change
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a map of annotations for quick lookup
|
||||||
|
annotationMap := make(map[string]*AnnotationEntry)
|
||||||
|
for _, ann := range annEntries {
|
||||||
|
annotationMap[ann.ID] = ann
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply changes and build result
|
||||||
|
result := make([]*Annotation, 0, len(annEntries))
|
||||||
|
for _, ann := range annEntries {
|
||||||
|
// Check if there's a change for this annotation
|
||||||
|
if change, hasChange := changesByID[ann.ID]; hasChange {
|
||||||
|
if change.Operation == "delete" {
|
||||||
|
// Skip deleted annotations
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if change.Operation == "update" {
|
||||||
|
// Apply changes
|
||||||
|
m.applyChanges(ann, change)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to annotation
|
||||||
|
annotation := m.annotationEntryToAnnotation(ann)
|
||||||
|
result = append(result, annotation)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by time (descending)
|
||||||
|
sort.Slice(result, func(i, j int) bool {
|
||||||
|
if result[i].Time != result[j].Time {
|
||||||
|
return result[i].Time > result[j].Time
|
||||||
|
}
|
||||||
|
return result[i].TimeEnd > result[j].TimeEnd
|
||||||
|
})
|
||||||
|
|
||||||
|
// Apply limit
|
||||||
|
if query.Limit > 0 && int64(len(result)) > query.Limit {
|
||||||
|
result = result[:query.Limit]
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyChanges applies changes to an annotation entry
|
||||||
|
func (m *Merger) applyChanges(ann *AnnotationEntry, change *ChangeEntry) {
|
||||||
|
if change.Text != "" {
|
||||||
|
ann.Text = change.Text
|
||||||
|
}
|
||||||
|
if change.Tags != nil {
|
||||||
|
ann.Tags = change.Tags
|
||||||
|
}
|
||||||
|
// Time and TimeEnd cannot be changed - they remain from the original annotation
|
||||||
|
}
|
||||||
|
|
||||||
|
// annotationsToDTOs converts annotation entries to annotations
|
||||||
|
func (m *Merger) annotationsToDTOs(entries []*AnnotationEntry) []*Annotation {
|
||||||
|
result := make([]*Annotation, 0, len(entries))
|
||||||
|
for _, entry := range entries {
|
||||||
|
result = append(result, m.annotationEntryToAnnotation(entry))
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// annotationEntryToAnnotation converts a single annotation entry to annotation
|
||||||
|
func (m *Merger) annotationEntryToAnnotation(entry *AnnotationEntry) *Annotation {
|
||||||
|
ann := &Annotation{
|
||||||
|
Time: entry.Time,
|
||||||
|
TimeEnd: entry.TimeEnd,
|
||||||
|
Text: entry.Text,
|
||||||
|
Tags: entry.Tags,
|
||||||
|
OrgID: entry.OrgID,
|
||||||
|
UserID: entry.UserID,
|
||||||
|
PanelID: entry.PanelID,
|
||||||
|
Created: entry.Created,
|
||||||
|
DashboardUID: entry.DashboardUID,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse ID to get numeric ID
|
||||||
|
if len(entry.ID) > 4 && entry.ID[:4] == "ann-" {
|
||||||
|
if id, err := parseID(entry.ID[4:]); err == nil {
|
||||||
|
ann.ID = id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ann
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseID parses annotation ID from string
|
||||||
|
func parseID(idStr string) (int64, error) {
|
||||||
|
// Try to parse as int64
|
||||||
|
return strconv.ParseInt(idStr, 10, 64)
|
||||||
|
}
|
||||||
82
pkg/loki-annotations/push_metadata.go
Normal file
82
pkg/loki-annotations/push_metadata.go
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SampleWithMetadata extends lokiclient.Sample to support structured metadata
|
||||||
|
type SampleWithMetadata struct {
|
||||||
|
T time.Time
|
||||||
|
V string
|
||||||
|
StructuredMetadata map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamWithMetadata extends lokiclient.Stream to support structured metadata
|
||||||
|
type StreamWithMetadata struct {
|
||||||
|
Stream map[string]string
|
||||||
|
Values []SampleWithMetadata
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON custom marshaling to include structured metadata in the values array
|
||||||
|
// Format: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||||
|
func (s *SampleWithMetadata) MarshalJSON() ([]byte, error) {
|
||||||
|
if len(s.StructuredMetadata) == 0 {
|
||||||
|
// Standard format: [timestamp, log_line]
|
||||||
|
return json.Marshal([2]string{
|
||||||
|
fmt.Sprintf("%d", s.T.UnixNano()),
|
||||||
|
s.V,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// Format with structured metadata: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||||
|
return json.Marshal([]interface{}{
|
||||||
|
fmt.Sprintf("%d", s.T.UnixNano()),
|
||||||
|
s.V,
|
||||||
|
map[string]interface{}{
|
||||||
|
"structuredMetadata": s.StructuredMetadata,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// pushWithStructuredMetadata pushes a stream with structured metadata to Loki
|
||||||
|
// This method manually constructs the JSON payload to include structured metadata
|
||||||
|
// in the format Loki expects: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||||
|
func (s *Store) pushWithStructuredMetadata(ctx context.Context, stream StreamWithMetadata) error {
|
||||||
|
// Create JSON payload with structured metadata
|
||||||
|
streamsJSON := struct {
|
||||||
|
Streams []StreamWithMetadata `json:"streams"`
|
||||||
|
}{
|
||||||
|
Streams: []StreamWithMetadata{stream},
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(streamsJSON)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal stream with structured metadata: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push to Loki using HTTP directly
|
||||||
|
uri := s.lokiURL.JoinPath("/loki/api/v1/push")
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create Loki request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
// Use a simple HTTP client to send the request
|
||||||
|
httpClient := &http.Client{}
|
||||||
|
resp, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to send request to Loki: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return fmt.Errorf("Loki returned non-200 status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
348
pkg/loki-annotations/store.go
Normal file
348
pkg/loki-annotations/store.go
Normal file
@@ -0,0 +1,348 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||||
|
ngmetrics "github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrLokiAnnotationsInternal = errors.New("loki annotations internal error")
|
||||||
|
ErrLokiAnnotationsNotFound = errors.New("loki annotations not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
type lokiAnnotationsClient interface {
|
||||||
|
Push(ctx context.Context, streams []lokiclient.Stream) error
|
||||||
|
RangeQuery(ctx context.Context, query string, start, end, limit int64) (lokiclient.QueryRes, error)
|
||||||
|
MaxQuerySize() int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Store struct {
|
||||||
|
client lokiAnnotationsClient
|
||||||
|
log log.Logger
|
||||||
|
lokiURL *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(cfg Config) (*Store, error) {
|
||||||
|
if cfg.URL == "" {
|
||||||
|
return nil, fmt.Errorf("Loki URL must be provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
lokiURL, err := url.Parse(cfg.URL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse Loki URL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// From annotations history store
|
||||||
|
// Use JSONEncoder to support structured metadata for Time and TimeEnd
|
||||||
|
// Structured metadata is only supported in JSON format, not protobuf
|
||||||
|
lokiCfg := lokiclient.LokiConfig{
|
||||||
|
ReadPathURL: lokiURL,
|
||||||
|
WritePathURL: lokiURL,
|
||||||
|
//BasicAuthUser: cfg.BasicAuthUser,
|
||||||
|
//BasicAuthPassword: cfg.BasicAuthPassword,
|
||||||
|
ExternalLabels: make(map[string]string),
|
||||||
|
MaxQueryLength: 0,
|
||||||
|
MaxQuerySize: 0,
|
||||||
|
Encoder: lokiclient.JSONEncoder{},
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := log.New("test-loki-annotations")
|
||||||
|
historianMetrics := ngmetrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations")
|
||||||
|
client := lokiclient.NewLokiClient(
|
||||||
|
lokiCfg,
|
||||||
|
lokiclient.NewRequester(),
|
||||||
|
historianMetrics.BytesWritten,
|
||||||
|
historianMetrics.WriteDuration,
|
||||||
|
logger,
|
||||||
|
tracing.NewNoopTracerService(),
|
||||||
|
"annotations.loki",
|
||||||
|
)
|
||||||
|
|
||||||
|
return &Store{
|
||||||
|
client: client,
|
||||||
|
log: logger,
|
||||||
|
lokiURL: lokiURL,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add creates a new annotation in the main stream
|
||||||
|
func (s *Store) Add(ctx context.Context, item *Annotation) error {
|
||||||
|
// Generate ID if not set
|
||||||
|
if item.ID == 0 {
|
||||||
|
// Use timestamp-based ID for simplicity
|
||||||
|
item.ID = time.Now().UnixNano() / int64(time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set defaults for timestamps
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
if item.Time == 0 {
|
||||||
|
item.Time = now
|
||||||
|
}
|
||||||
|
if item.TimeEnd == 0 {
|
||||||
|
item.TimeEnd = now
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := AnnotationEntry{
|
||||||
|
ID: fmt.Sprintf("ann-%d", item.ID),
|
||||||
|
OrgID: item.OrgID,
|
||||||
|
UserID: item.UserID,
|
||||||
|
DashboardUID: item.DashboardUID,
|
||||||
|
PanelID: item.PanelID,
|
||||||
|
Text: item.Text,
|
||||||
|
Tags: item.Tags,
|
||||||
|
Time: item.Time,
|
||||||
|
TimeEnd: item.TimeEnd,
|
||||||
|
Created: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
logLine, err := json.Marshal(entry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal annotation entry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
labels := map[string]string{
|
||||||
|
"stream": annotationsStream,
|
||||||
|
"org_id": strconv.FormatInt(item.OrgID, 10),
|
||||||
|
}
|
||||||
|
if item.DashboardUID != "" {
|
||||||
|
labels["dashboard_uid"] = item.DashboardUID
|
||||||
|
}
|
||||||
|
if item.PanelID > 0 {
|
||||||
|
labels["panel_id"] = strconv.FormatInt(item.PanelID, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create stream with structured metadata for Time and TimeEnd
|
||||||
|
// This allows querying by these fields using LogQL: {stream="..."} | Time="1234567890"
|
||||||
|
streamWithMetadata := StreamWithMetadata{
|
||||||
|
Stream: labels,
|
||||||
|
Values: []SampleWithMetadata{{
|
||||||
|
T: time.Now(),
|
||||||
|
V: string(logLine),
|
||||||
|
StructuredMetadata: map[string]string{
|
||||||
|
"Time": strconv.FormatInt(item.Time, 10),
|
||||||
|
"TimeEnd": strconv.FormatInt(item.TimeEnd, 10),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push with structured metadata using JSON format
|
||||||
|
return s.pushWithStructuredMetadata(ctx, streamWithMetadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update writes a change entry to the changes stream
|
||||||
|
func (s *Store) Update(ctx context.Context, item *Annotation) error {
|
||||||
|
changeEntry := ChangeEntry{
|
||||||
|
AnnotationID: fmt.Sprintf("ann-%d", item.ID),
|
||||||
|
Operation: "update",
|
||||||
|
Created: time.Now().UnixMilli(),
|
||||||
|
Text: item.Text,
|
||||||
|
Tags: item.Tags,
|
||||||
|
}
|
||||||
|
|
||||||
|
logLine, err := json.Marshal(changeEntry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal change entry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stream := lokiclient.Stream{
|
||||||
|
Stream: map[string]string{
|
||||||
|
"stream": changesStream,
|
||||||
|
"org_id": strconv.FormatInt(item.OrgID, 10),
|
||||||
|
},
|
||||||
|
Values: []lokiclient.Sample{{
|
||||||
|
T: time.Now(),
|
||||||
|
V: string(logLine),
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.client.Push(ctx, []lokiclient.Stream{stream})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete writes a delete entry to the changes stream
|
||||||
|
func (s *Store) Delete(ctx context.Context, params *DeleteParams) error {
|
||||||
|
changeEntry := ChangeEntry{
|
||||||
|
AnnotationID: fmt.Sprintf("ann-%d", params.ID),
|
||||||
|
Operation: "delete",
|
||||||
|
Created: time.Now().UnixMilli(),
|
||||||
|
}
|
||||||
|
|
||||||
|
logLine, err := json.Marshal(changeEntry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal delete entry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stream := lokiclient.Stream{
|
||||||
|
Stream: map[string]string{
|
||||||
|
"stream": changesStream,
|
||||||
|
"org_id": strconv.FormatInt(params.OrgID, 10),
|
||||||
|
},
|
||||||
|
Values: []lokiclient.Sample{{
|
||||||
|
T: time.Now(),
|
||||||
|
V: string(logLine),
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.client.Push(ctx, []lokiclient.Stream{stream})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves annotations with merge from changes stream
|
||||||
|
func (s *Store) Get(ctx context.Context, query Query) ([]*Annotation, error) {
|
||||||
|
// Log query parameters for debugging
|
||||||
|
if query.AnnotationID != 0 {
|
||||||
|
s.log.Debug("Get annotation by ID",
|
||||||
|
"annotation_id", query.AnnotationID,
|
||||||
|
"org_id", query.OrgID,
|
||||||
|
"from", query.From,
|
||||||
|
"to", query.To)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query main stream (from...to)
|
||||||
|
annotations, err := s.queryMainStream(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.log.Debug("Found annotations in main stream", "count", len(annotations))
|
||||||
|
|
||||||
|
// Query entire changes stream and merge with latest change per annotation ID
|
||||||
|
// We need all changes because they're matched to annotations by AnnotationID, not by time.
|
||||||
|
// The merge logic will keep only the latest change per annotation ID.
|
||||||
|
changes, err := s.queryChangesStream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge annotations with changes
|
||||||
|
merger := NewMerger()
|
||||||
|
result := merger.Merge(annotations, changes, query)
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTags returns tags from annotations
|
||||||
|
func (s *Store) GetTags(ctx context.Context, query TagsQuery) (TagsResult, error) {
|
||||||
|
return TagsResult{}, fmt.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryMainStream queries the main annotations stream using structured metadata filters
|
||||||
|
func (s *Store) queryMainStream(
|
||||||
|
ctx context.Context,
|
||||||
|
query Query,
|
||||||
|
) ([]*AnnotationEntry, error) {
|
||||||
|
// Build LogQL query with label filters
|
||||||
|
var labels []string
|
||||||
|
labels = append(labels, fmt.Sprintf(`stream="%s"`, annotationsStream))
|
||||||
|
labels = append(labels, fmt.Sprintf(`org_id="%d"`, query.OrgID))
|
||||||
|
|
||||||
|
if query.DashboardUID != "" {
|
||||||
|
labels = append(labels, fmt.Sprintf(`dashboard_uid="%s"`, query.DashboardUID))
|
||||||
|
}
|
||||||
|
if query.PanelID > 0 {
|
||||||
|
labels = append(labels, fmt.Sprintf(`panel_id="%d"`, query.PanelID))
|
||||||
|
}
|
||||||
|
|
||||||
|
logQL := "{" + strings.Join(labels, ",") + "}"
|
||||||
|
|
||||||
|
var filters []string
|
||||||
|
// TODO: Fix filtering by time
|
||||||
|
if query.From > 0 {
|
||||||
|
filters = append(filters, fmt.Sprintf(`Time >= "%d"`, query.From))
|
||||||
|
}
|
||||||
|
if query.To > 0 {
|
||||||
|
filters = append(filters, fmt.Sprintf(`Time <= "%d"`, query.To))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(filters) > 0 {
|
||||||
|
logQL += " | " + strings.Join(filters, " and ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// For AnnotationID queries, we still filter in code after querying
|
||||||
|
if query.AnnotationID != 0 {
|
||||||
|
s.log.Debug("Searching for annotation by ID (will filter in code)", "annotation_id", query.AnnotationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a wide time range for the Loki query itself (last 30 days max)
|
||||||
|
// The structured metadata filters will narrow down the results precisely
|
||||||
|
// Loki's RangeQuery requires time ranges, but we use a fixed window since
|
||||||
|
// structured metadata filters handle the precise filtering
|
||||||
|
now := time.Now().UnixNano()
|
||||||
|
maxRange := 30 * 24 * time.Hour
|
||||||
|
fromNs := now - int64(maxRange.Nanoseconds())
|
||||||
|
toNs := now
|
||||||
|
|
||||||
|
res, err := s.client.RangeQuery(ctx, logQL, fromNs, toNs, query.Limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%w: failed to query main stream: %w", ErrLokiAnnotationsInternal, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse results
|
||||||
|
entries := make([]*AnnotationEntry, 0)
|
||||||
|
targetID := ""
|
||||||
|
if query.AnnotationID != 0 {
|
||||||
|
targetID = fmt.Sprintf("ann-%d", query.AnnotationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, stream := range res.Data.Result {
|
||||||
|
for _, sample := range stream.Values {
|
||||||
|
var entry AnnotationEntry
|
||||||
|
if err := json.Unmarshal([]byte(sample.V), &entry); err != nil {
|
||||||
|
s.log.Debug("Failed to unmarshal annotation entry", "error", err, "entry", sample.V)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter by AnnotationID if specified
|
||||||
|
if query.AnnotationID != 0 && entry.ID != targetID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
entries = append(entries, &entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) queryChangesStream(
|
||||||
|
ctx context.Context,
|
||||||
|
) ([]*ChangeEntry, error) {
|
||||||
|
logQL := fmt.Sprintf(`{stream="%s"}`, changesStream)
|
||||||
|
|
||||||
|
// Use a very old timestamp (epoch 0) to query the entire stream
|
||||||
|
// Loki's MaxQueryLength will clamp this if there's a limit, but we want all available changes
|
||||||
|
|
||||||
|
fromNs := int64(0) // Start from epoch 0 to get all changes
|
||||||
|
toNs := time.Now().UnixNano()
|
||||||
|
|
||||||
|
res, err := s.client.RangeQuery(ctx, logQL, fromNs, toNs, 10000)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%w: failed to query changes stream: %w", ErrLokiAnnotationsInternal, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse results
|
||||||
|
changes := make([]*ChangeEntry, 0)
|
||||||
|
for _, stream := range res.Data.Result {
|
||||||
|
for _, sample := range stream.Values {
|
||||||
|
var change ChangeEntry
|
||||||
|
if err := json.Unmarshal([]byte(sample.V), &change); err != nil {
|
||||||
|
s.log.Debug("Failed to unmarshal change entry", "error", err, "entry", sample.V)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
changes = append(changes, &change)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return changes, nil
|
||||||
|
}
|
||||||
29
pkg/loki-annotations/store_types.go
Normal file
29
pkg/loki-annotations/store_types.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
const (
|
||||||
|
annotationsStream = "grafana_annotations"
|
||||||
|
changesStream = "grafana_annotations_changes"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AnnotationEntry represents an annotation entry in the main stream
|
||||||
|
type AnnotationEntry struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
OrgID int64 `json:"org_id"`
|
||||||
|
UserID int64 `json:"user_id,omitempty"`
|
||||||
|
DashboardUID string `json:"dashboard_uid,omitempty"`
|
||||||
|
PanelID int64 `json:"panel_id,omitempty"`
|
||||||
|
Text string `json:"text"`
|
||||||
|
Tags []string `json:"tags,omitempty"`
|
||||||
|
Time int64 `json:"time"` // event start time, milliseconds
|
||||||
|
TimeEnd int64 `json:"time_end"` // event end time, milliseconds, optional
|
||||||
|
Created int64 `json:"created"` // annotation creation time, milliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChangeEntry represents a change entry in the changes stream
|
||||||
|
type ChangeEntry struct {
|
||||||
|
AnnotationID string `json:"annotation_id"`
|
||||||
|
Operation string `json:"operation"` // "update" or "delete"
|
||||||
|
Created int64 `json:"created"` // milliseconds - time when the change was made
|
||||||
|
Text string `json:"text,omitempty"` // only for "update", empty if unchanged
|
||||||
|
Tags []string `json:"tags,omitempty"` // only for "update", nil if unchanged
|
||||||
|
}
|
||||||
57
pkg/loki-annotations/types.go
Normal file
57
pkg/loki-annotations/types.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// Annotation represents an annotation item (used for both input and output)
|
||||||
|
type Annotation struct {
|
||||||
|
ID int64
|
||||||
|
OrgID int64
|
||||||
|
UserID int64
|
||||||
|
DashboardUID string // empty string means not set
|
||||||
|
PanelID int64
|
||||||
|
Text string
|
||||||
|
Tags []string
|
||||||
|
Time int64 // event start time, milliseconds
|
||||||
|
TimeEnd int64 // event end time, milliseconds, optional
|
||||||
|
Created int64 // annotation creation time, milliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query represents a query for annotations
|
||||||
|
type Query struct {
|
||||||
|
OrgID int64
|
||||||
|
From int64
|
||||||
|
To int64
|
||||||
|
DashboardUID string
|
||||||
|
PanelID int64
|
||||||
|
AnnotationID int64
|
||||||
|
Limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagsQuery represents a query for tags
|
||||||
|
type TagsQuery struct {
|
||||||
|
OrgID int64
|
||||||
|
Tag string
|
||||||
|
Limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tag represents a tag result
|
||||||
|
type Tag struct {
|
||||||
|
Tag string
|
||||||
|
Count int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagsResult represents the result of a tags search
|
||||||
|
type TagsResult struct {
|
||||||
|
Tags []*Tag
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteParams represents parameters for deleting an annotation
|
||||||
|
type DeleteParams struct {
|
||||||
|
OrgID int64
|
||||||
|
ID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config represents configuration for Loki annotations storage
|
||||||
|
type Config struct {
|
||||||
|
URL string
|
||||||
|
//BasicAuthUser string
|
||||||
|
//BasicAuthPassword string
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user