mirror of
https://github.com/grafana/grafana.git
synced 2025-12-20 19:44:55 +08:00
Compare commits
3 Commits
docs/add-t
...
undef1nd/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65f6aa36c7 | ||
|
|
999e4326dc | ||
|
|
acfa9cad43 |
205
pkg/loki-annotations/main.go
Normal file
205
pkg/loki-annotations/main.go
Normal file
@@ -0,0 +1,205 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
lokiURL = flag.String("loki-url", "http://localhost:3100", "Loki URL")
|
||||
//basicAuthUser = flag.String("basic-auth-user", "", "Basic auth username")
|
||||
//basicAuthPassword = flag.String("basic-auth-password", "", "Basic auth password")
|
||||
|
||||
text = flag.String("text", "Test annotation", "Annotation text")
|
||||
tags = flag.String("tags", "", "Comma-separated tags")
|
||||
orgID = flag.Int64("org-id", 1, "Organization ID")
|
||||
dashboardUID = flag.String("dashboard-uid", "xxx-yyy-zzz", "Dashboard UID")
|
||||
panelID = flag.Int64("panel-id", 1, "Panel ID")
|
||||
annotationID = flag.Int64("annotation-id", 0, "Annotation ID (for get/update/delete)")
|
||||
limit = flag.Int64("limit", 100, "Query limit")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
args := flag.Args()
|
||||
if len(args) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "Error: action is required\n")
|
||||
fmt.Fprintf(os.Stderr, "Usage: %s <action> [flags]\n", os.Args[0])
|
||||
fmt.Fprintf(os.Stderr, "Available actions: create, get, list-tags, update, delete\n")
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
action := args[0]
|
||||
|
||||
cfg := Config{
|
||||
URL: *lokiURL,
|
||||
//BasicAuthUser: *basicAuthUser,
|
||||
//BasicAuthPassword: *basicAuthPassword,
|
||||
}
|
||||
|
||||
// Create store
|
||||
store, err := NewStore(cfg)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error creating store: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
switch action {
|
||||
case "create":
|
||||
id, err := addAnnotation(ctx, store, *orgID, *text, *dashboardUID, *panelID, *tags)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error creating annotation: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("✓ Annotation created with ID: %d\n", id)
|
||||
|
||||
case "get":
|
||||
anns, err := getAnnotations(ctx, store, *orgID, *dashboardUID, *panelID, *annotationID, *limit)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error getting annotations: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("✓ Found %d annotation(s):\n", len(anns))
|
||||
for _, ann := range anns {
|
||||
fmt.Printf(" ID: %d, Text: %s, Time: %s, Tags: %v\n",
|
||||
ann.ID, ann.Text, time.Unix(0, ann.Time*1e6).Format(time.RFC3339), ann.Tags)
|
||||
}
|
||||
|
||||
case "list-tags":
|
||||
tagsResult, err := getTags(ctx, store, *orgID, *tags)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error listing tags: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("✓ Found %d tag(s):\n", len(tagsResult.Tags))
|
||||
for _, tag := range tagsResult.Tags {
|
||||
fmt.Printf(" %s: %d\n", tag.Tag, tag.Count)
|
||||
}
|
||||
|
||||
case "update":
|
||||
if err := updateAnnotation(ctx, store, *orgID, *annotationID, *text, *tags); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error updating annotation: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("✓ Annotation %d updated\n", *annotationID)
|
||||
|
||||
case "delete":
|
||||
if err := deleteAnnotation(ctx, store, *orgID, *annotationID); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error deleting annotation: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("✓ Annotation %d deleted\n", *annotationID)
|
||||
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Error: unknown action: %s\n", action)
|
||||
fmt.Fprintf(os.Stderr, "Available actions: create, get, list-tags, update, delete\n")
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func addAnnotation(ctx context.Context, store *Store, orgID int64, text, dashboardUID string, panelID int64, tagsStr string) (int64, error) {
|
||||
now := time.Now().UnixMilli()
|
||||
ann := &Annotation{
|
||||
OrgID: orgID,
|
||||
UserID: 1,
|
||||
DashboardUID: dashboardUID,
|
||||
PanelID: panelID,
|
||||
Text: text,
|
||||
Time: now,
|
||||
Created: now,
|
||||
}
|
||||
|
||||
if tagsStr != "" {
|
||||
tags := []string{}
|
||||
for _, tag := range splitTags(tagsStr) {
|
||||
if tag != "" {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
}
|
||||
ann.Tags = tags
|
||||
}
|
||||
|
||||
if err := store.Add(ctx, ann); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return ann.ID, nil
|
||||
}
|
||||
|
||||
func getAnnotations(ctx context.Context, store *Store, orgID int64, dashboardUID string, panelID, annotationID, limit int64) ([]*Annotation, error) {
|
||||
now := time.Now().UnixMilli()
|
||||
thirtyDaysAgo := now - int64(30*24*time.Hour.Milliseconds())
|
||||
|
||||
query := Query{
|
||||
OrgID: orgID,
|
||||
From: thirtyDaysAgo,
|
||||
To: now,
|
||||
DashboardUID: dashboardUID,
|
||||
PanelID: panelID,
|
||||
AnnotationID: annotationID,
|
||||
Limit: limit,
|
||||
}
|
||||
|
||||
return store.Get(ctx, query)
|
||||
}
|
||||
|
||||
func getTags(ctx context.Context, store *Store, orgID int64, tagFilter string) (TagsResult, error) {
|
||||
query := TagsQuery{
|
||||
OrgID: orgID,
|
||||
Tag: tagFilter,
|
||||
Limit: 100,
|
||||
}
|
||||
|
||||
return store.GetTags(ctx, query)
|
||||
}
|
||||
|
||||
func updateAnnotation(ctx context.Context, store *Store, orgID, annotationID int64, text, tagsStr string) error {
|
||||
now := time.Now().UnixMilli()
|
||||
ann := &Annotation{
|
||||
ID: annotationID,
|
||||
OrgID: orgID,
|
||||
Text: text,
|
||||
Time: now,
|
||||
Created: now,
|
||||
}
|
||||
|
||||
if tagsStr != "" {
|
||||
tags := []string{}
|
||||
for _, tag := range splitTags(tagsStr) {
|
||||
if tag != "" {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
}
|
||||
ann.Tags = tags
|
||||
}
|
||||
|
||||
return store.Update(ctx, ann)
|
||||
}
|
||||
|
||||
func deleteAnnotation(ctx context.Context, store *Store, orgID, annotationID int64) error {
|
||||
params := &DeleteParams{
|
||||
OrgID: orgID,
|
||||
ID: annotationID,
|
||||
}
|
||||
|
||||
return store.Delete(ctx, params)
|
||||
}
|
||||
|
||||
func splitTags(tagsStr string) []string {
|
||||
tags := []string{}
|
||||
for _, tag := range strings.Split(tagsStr, ",") {
|
||||
tag = strings.TrimSpace(tag)
|
||||
if tag != "" {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
126
pkg/loki-annotations/merge.go
Normal file
126
pkg/loki-annotations/merge.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Merger handles merging annotations with changes
|
||||
type Merger struct{}
|
||||
|
||||
// NewMerger creates a new merger
|
||||
func NewMerger() *Merger {
|
||||
return &Merger{}
|
||||
}
|
||||
|
||||
// Merge merges annotations with changes, applying the latest change to each annotation
|
||||
func (m *Merger) Merge(
|
||||
annEntries []*AnnotationEntry,
|
||||
changes []*ChangeEntry,
|
||||
query Query,
|
||||
) []*Annotation {
|
||||
// If no changes, convert annotations directly
|
||||
if len(changes) == 0 {
|
||||
return m.annotationsToDTOs(annEntries)
|
||||
}
|
||||
|
||||
// Group changes by annotation ID, keeping only the latest change for each
|
||||
changesByID := make(map[string]*ChangeEntry)
|
||||
for _, change := range changes {
|
||||
existing, ok := changesByID[change.AnnotationID]
|
||||
if !ok || change.Created > existing.Created {
|
||||
changesByID[change.AnnotationID] = change
|
||||
}
|
||||
}
|
||||
|
||||
// Create a map of annotations for quick lookup
|
||||
annotationMap := make(map[string]*AnnotationEntry)
|
||||
for _, ann := range annEntries {
|
||||
annotationMap[ann.ID] = ann
|
||||
}
|
||||
|
||||
// Apply changes and build result
|
||||
result := make([]*Annotation, 0, len(annEntries))
|
||||
for _, ann := range annEntries {
|
||||
// Check if there's a change for this annotation
|
||||
if change, hasChange := changesByID[ann.ID]; hasChange {
|
||||
if change.Operation == "delete" {
|
||||
// Skip deleted annotations
|
||||
continue
|
||||
}
|
||||
if change.Operation == "update" {
|
||||
// Apply changes
|
||||
m.applyChanges(ann, change)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to annotation
|
||||
annotation := m.annotationEntryToAnnotation(ann)
|
||||
result = append(result, annotation)
|
||||
}
|
||||
|
||||
// Sort by time (descending)
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
if result[i].Time != result[j].Time {
|
||||
return result[i].Time > result[j].Time
|
||||
}
|
||||
return result[i].TimeEnd > result[j].TimeEnd
|
||||
})
|
||||
|
||||
// Apply limit
|
||||
if query.Limit > 0 && int64(len(result)) > query.Limit {
|
||||
result = result[:query.Limit]
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// applyChanges applies changes to an annotation entry
|
||||
func (m *Merger) applyChanges(ann *AnnotationEntry, change *ChangeEntry) {
|
||||
if change.Text != "" {
|
||||
ann.Text = change.Text
|
||||
}
|
||||
if change.Tags != nil {
|
||||
ann.Tags = change.Tags
|
||||
}
|
||||
// Time and TimeEnd cannot be changed - they remain from the original annotation
|
||||
}
|
||||
|
||||
// annotationsToDTOs converts annotation entries to annotations
|
||||
func (m *Merger) annotationsToDTOs(entries []*AnnotationEntry) []*Annotation {
|
||||
result := make([]*Annotation, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
result = append(result, m.annotationEntryToAnnotation(entry))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// annotationEntryToAnnotation converts a single annotation entry to annotation
|
||||
func (m *Merger) annotationEntryToAnnotation(entry *AnnotationEntry) *Annotation {
|
||||
ann := &Annotation{
|
||||
Time: entry.Time,
|
||||
TimeEnd: entry.TimeEnd,
|
||||
Text: entry.Text,
|
||||
Tags: entry.Tags,
|
||||
OrgID: entry.OrgID,
|
||||
UserID: entry.UserID,
|
||||
PanelID: entry.PanelID,
|
||||
Created: entry.Created,
|
||||
DashboardUID: entry.DashboardUID,
|
||||
}
|
||||
|
||||
// Parse ID to get numeric ID
|
||||
if len(entry.ID) > 4 && entry.ID[:4] == "ann-" {
|
||||
if id, err := parseID(entry.ID[4:]); err == nil {
|
||||
ann.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
return ann
|
||||
}
|
||||
|
||||
// parseID parses annotation ID from string
|
||||
func parseID(idStr string) (int64, error) {
|
||||
// Try to parse as int64
|
||||
return strconv.ParseInt(idStr, 10, 64)
|
||||
}
|
||||
82
pkg/loki-annotations/push_metadata.go
Normal file
82
pkg/loki-annotations/push_metadata.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SampleWithMetadata extends lokiclient.Sample to support structured metadata
|
||||
type SampleWithMetadata struct {
|
||||
T time.Time
|
||||
V string
|
||||
StructuredMetadata map[string]string
|
||||
}
|
||||
|
||||
// StreamWithMetadata extends lokiclient.Stream to support structured metadata
|
||||
type StreamWithMetadata struct {
|
||||
Stream map[string]string
|
||||
Values []SampleWithMetadata
|
||||
}
|
||||
|
||||
// MarshalJSON custom marshaling to include structured metadata in the values array
|
||||
// Format: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||
func (s *SampleWithMetadata) MarshalJSON() ([]byte, error) {
|
||||
if len(s.StructuredMetadata) == 0 {
|
||||
// Standard format: [timestamp, log_line]
|
||||
return json.Marshal([2]string{
|
||||
fmt.Sprintf("%d", s.T.UnixNano()),
|
||||
s.V,
|
||||
})
|
||||
}
|
||||
// Format with structured metadata: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||
return json.Marshal([]interface{}{
|
||||
fmt.Sprintf("%d", s.T.UnixNano()),
|
||||
s.V,
|
||||
map[string]interface{}{
|
||||
"structuredMetadata": s.StructuredMetadata,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// pushWithStructuredMetadata pushes a stream with structured metadata to Loki
|
||||
// This method manually constructs the JSON payload to include structured metadata
|
||||
// in the format Loki expects: [timestamp, log_line, {structuredMetadata: {...}}]
|
||||
func (s *Store) pushWithStructuredMetadata(ctx context.Context, stream StreamWithMetadata) error {
|
||||
// Create JSON payload with structured metadata
|
||||
streamsJSON := struct {
|
||||
Streams []StreamWithMetadata `json:"streams"`
|
||||
}{
|
||||
Streams: []StreamWithMetadata{stream},
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(streamsJSON)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal stream with structured metadata: %w", err)
|
||||
}
|
||||
|
||||
// Push to Loki using HTTP directly
|
||||
uri := s.lokiURL.JoinPath("/loki/api/v1/push")
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create Loki request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// Use a simple HTTP client to send the request
|
||||
httpClient := &http.Client{}
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send request to Loki: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("Loki returned non-200 status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
348
pkg/loki-annotations/store.go
Normal file
348
pkg/loki-annotations/store.go
Normal file
@@ -0,0 +1,348 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/notify/historian/lokiclient"
|
||||
ngmetrics "github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLokiAnnotationsInternal = errors.New("loki annotations internal error")
|
||||
ErrLokiAnnotationsNotFound = errors.New("loki annotations not found")
|
||||
)
|
||||
|
||||
type lokiAnnotationsClient interface {
|
||||
Push(ctx context.Context, streams []lokiclient.Stream) error
|
||||
RangeQuery(ctx context.Context, query string, start, end, limit int64) (lokiclient.QueryRes, error)
|
||||
MaxQuerySize() int
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
client lokiAnnotationsClient
|
||||
log log.Logger
|
||||
lokiURL *url.URL
|
||||
}
|
||||
|
||||
func NewStore(cfg Config) (*Store, error) {
|
||||
if cfg.URL == "" {
|
||||
return nil, fmt.Errorf("Loki URL must be provided")
|
||||
}
|
||||
|
||||
lokiURL, err := url.Parse(cfg.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse Loki URL: %w", err)
|
||||
}
|
||||
|
||||
// From annotations history store
|
||||
// Use JSONEncoder to support structured metadata for Time and TimeEnd
|
||||
// Structured metadata is only supported in JSON format, not protobuf
|
||||
lokiCfg := lokiclient.LokiConfig{
|
||||
ReadPathURL: lokiURL,
|
||||
WritePathURL: lokiURL,
|
||||
//BasicAuthUser: cfg.BasicAuthUser,
|
||||
//BasicAuthPassword: cfg.BasicAuthPassword,
|
||||
ExternalLabels: make(map[string]string),
|
||||
MaxQueryLength: 0,
|
||||
MaxQuerySize: 0,
|
||||
Encoder: lokiclient.JSONEncoder{},
|
||||
}
|
||||
|
||||
logger := log.New("test-loki-annotations")
|
||||
historianMetrics := ngmetrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations")
|
||||
client := lokiclient.NewLokiClient(
|
||||
lokiCfg,
|
||||
lokiclient.NewRequester(),
|
||||
historianMetrics.BytesWritten,
|
||||
historianMetrics.WriteDuration,
|
||||
logger,
|
||||
tracing.NewNoopTracerService(),
|
||||
"annotations.loki",
|
||||
)
|
||||
|
||||
return &Store{
|
||||
client: client,
|
||||
log: logger,
|
||||
lokiURL: lokiURL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Add creates a new annotation in the main stream
|
||||
func (s *Store) Add(ctx context.Context, item *Annotation) error {
|
||||
// Generate ID if not set
|
||||
if item.ID == 0 {
|
||||
// Use timestamp-based ID for simplicity
|
||||
item.ID = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
}
|
||||
|
||||
// Set defaults for timestamps
|
||||
now := time.Now().UnixMilli()
|
||||
if item.Time == 0 {
|
||||
item.Time = now
|
||||
}
|
||||
if item.TimeEnd == 0 {
|
||||
item.TimeEnd = now
|
||||
}
|
||||
|
||||
entry := AnnotationEntry{
|
||||
ID: fmt.Sprintf("ann-%d", item.ID),
|
||||
OrgID: item.OrgID,
|
||||
UserID: item.UserID,
|
||||
DashboardUID: item.DashboardUID,
|
||||
PanelID: item.PanelID,
|
||||
Text: item.Text,
|
||||
Tags: item.Tags,
|
||||
Time: item.Time,
|
||||
TimeEnd: item.TimeEnd,
|
||||
Created: now,
|
||||
}
|
||||
|
||||
logLine, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal annotation entry: %w", err)
|
||||
}
|
||||
|
||||
labels := map[string]string{
|
||||
"stream": annotationsStream,
|
||||
"org_id": strconv.FormatInt(item.OrgID, 10),
|
||||
}
|
||||
if item.DashboardUID != "" {
|
||||
labels["dashboard_uid"] = item.DashboardUID
|
||||
}
|
||||
if item.PanelID > 0 {
|
||||
labels["panel_id"] = strconv.FormatInt(item.PanelID, 10)
|
||||
}
|
||||
|
||||
// Create stream with structured metadata for Time and TimeEnd
|
||||
// This allows querying by these fields using LogQL: {stream="..."} | Time="1234567890"
|
||||
streamWithMetadata := StreamWithMetadata{
|
||||
Stream: labels,
|
||||
Values: []SampleWithMetadata{{
|
||||
T: time.Now(),
|
||||
V: string(logLine),
|
||||
StructuredMetadata: map[string]string{
|
||||
"Time": strconv.FormatInt(item.Time, 10),
|
||||
"TimeEnd": strconv.FormatInt(item.TimeEnd, 10),
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
// Push with structured metadata using JSON format
|
||||
return s.pushWithStructuredMetadata(ctx, streamWithMetadata)
|
||||
}
|
||||
|
||||
// Update writes a change entry to the changes stream
|
||||
func (s *Store) Update(ctx context.Context, item *Annotation) error {
|
||||
changeEntry := ChangeEntry{
|
||||
AnnotationID: fmt.Sprintf("ann-%d", item.ID),
|
||||
Operation: "update",
|
||||
Created: time.Now().UnixMilli(),
|
||||
Text: item.Text,
|
||||
Tags: item.Tags,
|
||||
}
|
||||
|
||||
logLine, err := json.Marshal(changeEntry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal change entry: %w", err)
|
||||
}
|
||||
|
||||
stream := lokiclient.Stream{
|
||||
Stream: map[string]string{
|
||||
"stream": changesStream,
|
||||
"org_id": strconv.FormatInt(item.OrgID, 10),
|
||||
},
|
||||
Values: []lokiclient.Sample{{
|
||||
T: time.Now(),
|
||||
V: string(logLine),
|
||||
}},
|
||||
}
|
||||
|
||||
return s.client.Push(ctx, []lokiclient.Stream{stream})
|
||||
}
|
||||
|
||||
// Delete writes a delete entry to the changes stream
|
||||
func (s *Store) Delete(ctx context.Context, params *DeleteParams) error {
|
||||
changeEntry := ChangeEntry{
|
||||
AnnotationID: fmt.Sprintf("ann-%d", params.ID),
|
||||
Operation: "delete",
|
||||
Created: time.Now().UnixMilli(),
|
||||
}
|
||||
|
||||
logLine, err := json.Marshal(changeEntry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal delete entry: %w", err)
|
||||
}
|
||||
|
||||
stream := lokiclient.Stream{
|
||||
Stream: map[string]string{
|
||||
"stream": changesStream,
|
||||
"org_id": strconv.FormatInt(params.OrgID, 10),
|
||||
},
|
||||
Values: []lokiclient.Sample{{
|
||||
T: time.Now(),
|
||||
V: string(logLine),
|
||||
}},
|
||||
}
|
||||
|
||||
return s.client.Push(ctx, []lokiclient.Stream{stream})
|
||||
}
|
||||
|
||||
// Get retrieves annotations with merge from changes stream
|
||||
func (s *Store) Get(ctx context.Context, query Query) ([]*Annotation, error) {
|
||||
// Log query parameters for debugging
|
||||
if query.AnnotationID != 0 {
|
||||
s.log.Debug("Get annotation by ID",
|
||||
"annotation_id", query.AnnotationID,
|
||||
"org_id", query.OrgID,
|
||||
"from", query.From,
|
||||
"to", query.To)
|
||||
}
|
||||
|
||||
// Query main stream (from...to)
|
||||
annotations, err := s.queryMainStream(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.log.Debug("Found annotations in main stream", "count", len(annotations))
|
||||
|
||||
// Query entire changes stream and merge with latest change per annotation ID
|
||||
// We need all changes because they're matched to annotations by AnnotationID, not by time.
|
||||
// The merge logic will keep only the latest change per annotation ID.
|
||||
changes, err := s.queryChangesStream(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Merge annotations with changes
|
||||
merger := NewMerger()
|
||||
result := merger.Merge(annotations, changes, query)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetTags returns tags from annotations
|
||||
func (s *Store) GetTags(ctx context.Context, query TagsQuery) (TagsResult, error) {
|
||||
return TagsResult{}, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// queryMainStream queries the main annotations stream using structured metadata filters
|
||||
func (s *Store) queryMainStream(
|
||||
ctx context.Context,
|
||||
query Query,
|
||||
) ([]*AnnotationEntry, error) {
|
||||
// Build LogQL query with label filters
|
||||
var labels []string
|
||||
labels = append(labels, fmt.Sprintf(`stream="%s"`, annotationsStream))
|
||||
labels = append(labels, fmt.Sprintf(`org_id="%d"`, query.OrgID))
|
||||
|
||||
if query.DashboardUID != "" {
|
||||
labels = append(labels, fmt.Sprintf(`dashboard_uid="%s"`, query.DashboardUID))
|
||||
}
|
||||
if query.PanelID > 0 {
|
||||
labels = append(labels, fmt.Sprintf(`panel_id="%d"`, query.PanelID))
|
||||
}
|
||||
|
||||
logQL := "{" + strings.Join(labels, ",") + "}"
|
||||
|
||||
var filters []string
|
||||
// TODO: Fix filtering by time
|
||||
if query.From > 0 {
|
||||
filters = append(filters, fmt.Sprintf(`Time >= "%d"`, query.From))
|
||||
}
|
||||
if query.To > 0 {
|
||||
filters = append(filters, fmt.Sprintf(`Time <= "%d"`, query.To))
|
||||
}
|
||||
|
||||
if len(filters) > 0 {
|
||||
logQL += " | " + strings.Join(filters, " and ")
|
||||
}
|
||||
|
||||
// For AnnotationID queries, we still filter in code after querying
|
||||
if query.AnnotationID != 0 {
|
||||
s.log.Debug("Searching for annotation by ID (will filter in code)", "annotation_id", query.AnnotationID)
|
||||
}
|
||||
|
||||
// Use a wide time range for the Loki query itself (last 30 days max)
|
||||
// The structured metadata filters will narrow down the results precisely
|
||||
// Loki's RangeQuery requires time ranges, but we use a fixed window since
|
||||
// structured metadata filters handle the precise filtering
|
||||
now := time.Now().UnixNano()
|
||||
maxRange := 30 * 24 * time.Hour
|
||||
fromNs := now - int64(maxRange.Nanoseconds())
|
||||
toNs := now
|
||||
|
||||
res, err := s.client.RangeQuery(ctx, logQL, fromNs, toNs, query.Limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: failed to query main stream: %w", ErrLokiAnnotationsInternal, err)
|
||||
}
|
||||
|
||||
// Parse results
|
||||
entries := make([]*AnnotationEntry, 0)
|
||||
targetID := ""
|
||||
if query.AnnotationID != 0 {
|
||||
targetID = fmt.Sprintf("ann-%d", query.AnnotationID)
|
||||
}
|
||||
|
||||
for _, stream := range res.Data.Result {
|
||||
for _, sample := range stream.Values {
|
||||
var entry AnnotationEntry
|
||||
if err := json.Unmarshal([]byte(sample.V), &entry); err != nil {
|
||||
s.log.Debug("Failed to unmarshal annotation entry", "error", err, "entry", sample.V)
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter by AnnotationID if specified
|
||||
if query.AnnotationID != 0 && entry.ID != targetID {
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, &entry)
|
||||
}
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (s *Store) queryChangesStream(
|
||||
ctx context.Context,
|
||||
) ([]*ChangeEntry, error) {
|
||||
logQL := fmt.Sprintf(`{stream="%s"}`, changesStream)
|
||||
|
||||
// Use a very old timestamp (epoch 0) to query the entire stream
|
||||
// Loki's MaxQueryLength will clamp this if there's a limit, but we want all available changes
|
||||
|
||||
fromNs := int64(0) // Start from epoch 0 to get all changes
|
||||
toNs := time.Now().UnixNano()
|
||||
|
||||
res, err := s.client.RangeQuery(ctx, logQL, fromNs, toNs, 10000)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: failed to query changes stream: %w", ErrLokiAnnotationsInternal, err)
|
||||
}
|
||||
|
||||
// Parse results
|
||||
changes := make([]*ChangeEntry, 0)
|
||||
for _, stream := range res.Data.Result {
|
||||
for _, sample := range stream.Values {
|
||||
var change ChangeEntry
|
||||
if err := json.Unmarshal([]byte(sample.V), &change); err != nil {
|
||||
s.log.Debug("Failed to unmarshal change entry", "error", err, "entry", sample.V)
|
||||
continue
|
||||
}
|
||||
changes = append(changes, &change)
|
||||
}
|
||||
}
|
||||
|
||||
return changes, nil
|
||||
}
|
||||
29
pkg/loki-annotations/store_types.go
Normal file
29
pkg/loki-annotations/store_types.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package main
|
||||
|
||||
const (
|
||||
annotationsStream = "grafana_annotations"
|
||||
changesStream = "grafana_annotations_changes"
|
||||
)
|
||||
|
||||
// AnnotationEntry represents an annotation entry in the main stream
|
||||
type AnnotationEntry struct {
|
||||
ID string `json:"id"`
|
||||
OrgID int64 `json:"org_id"`
|
||||
UserID int64 `json:"user_id,omitempty"`
|
||||
DashboardUID string `json:"dashboard_uid,omitempty"`
|
||||
PanelID int64 `json:"panel_id,omitempty"`
|
||||
Text string `json:"text"`
|
||||
Tags []string `json:"tags,omitempty"`
|
||||
Time int64 `json:"time"` // event start time, milliseconds
|
||||
TimeEnd int64 `json:"time_end"` // event end time, milliseconds, optional
|
||||
Created int64 `json:"created"` // annotation creation time, milliseconds
|
||||
}
|
||||
|
||||
// ChangeEntry represents a change entry in the changes stream
|
||||
type ChangeEntry struct {
|
||||
AnnotationID string `json:"annotation_id"`
|
||||
Operation string `json:"operation"` // "update" or "delete"
|
||||
Created int64 `json:"created"` // milliseconds - time when the change was made
|
||||
Text string `json:"text,omitempty"` // only for "update", empty if unchanged
|
||||
Tags []string `json:"tags,omitempty"` // only for "update", nil if unchanged
|
||||
}
|
||||
57
pkg/loki-annotations/types.go
Normal file
57
pkg/loki-annotations/types.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package main
|
||||
|
||||
// Annotation represents an annotation item (used for both input and output)
|
||||
type Annotation struct {
|
||||
ID int64
|
||||
OrgID int64
|
||||
UserID int64
|
||||
DashboardUID string // empty string means not set
|
||||
PanelID int64
|
||||
Text string
|
||||
Tags []string
|
||||
Time int64 // event start time, milliseconds
|
||||
TimeEnd int64 // event end time, milliseconds, optional
|
||||
Created int64 // annotation creation time, milliseconds
|
||||
}
|
||||
|
||||
// Query represents a query for annotations
|
||||
type Query struct {
|
||||
OrgID int64
|
||||
From int64
|
||||
To int64
|
||||
DashboardUID string
|
||||
PanelID int64
|
||||
AnnotationID int64
|
||||
Limit int64
|
||||
}
|
||||
|
||||
// TagsQuery represents a query for tags
|
||||
type TagsQuery struct {
|
||||
OrgID int64
|
||||
Tag string
|
||||
Limit int64
|
||||
}
|
||||
|
||||
// Tag represents a tag result
|
||||
type Tag struct {
|
||||
Tag string
|
||||
Count int64
|
||||
}
|
||||
|
||||
// TagsResult represents the result of a tags search
|
||||
type TagsResult struct {
|
||||
Tags []*Tag
|
||||
}
|
||||
|
||||
// DeleteParams represents parameters for deleting an annotation
|
||||
type DeleteParams struct {
|
||||
OrgID int64
|
||||
ID int64
|
||||
}
|
||||
|
||||
// Config represents configuration for Loki annotations storage
|
||||
type Config struct {
|
||||
URL string
|
||||
//BasicAuthUser string
|
||||
//BasicAuthPassword string
|
||||
}
|
||||
Reference in New Issue
Block a user