Compare commits

...

1 Commits

Author SHA1 Message Date
Steve Simpson
2a24d173dd Alerting: Split Loki alert state history into reader and writer.
The historian app will provide the API to read alert state history, though it
does not (and will not) have the capability to write history. This refactor
makes it possible to create a reader without a writer.
2025-12-11 23:29:28 +00:00
3 changed files with 534 additions and 479 deletions

View File

@@ -2,32 +2,16 @@ package historian
import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/alerting/notify/historian/lokiclient"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/trace"
alertingInstrument "github.com/grafana/alerting/http/instrument"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/accesscontrol"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
)
const (
@@ -49,25 +33,6 @@ const (
const defaultQueryRange = 6 * time.Hour
var (
ErrLokiQueryTooLong = errutil.BadRequest("loki.requestTooLong").MustTemplate(
"Request to Loki exceeded ({{.Public.QuerySize}} bytes) configured maximum size of {{.Public.MaxLimit}} bytes. Query: {{.Private.Query}}",
errutil.WithPublic("Query for Loki exceeded the configured limit of {{.Public.MaxLimit}} bytes. Remove some filters and try again."),
)
)
func NewErrLokiQueryTooLong(query string, maxLimit int) error {
return ErrLokiQueryTooLong.Build(errutil.TemplateData{
Private: map[string]any{
"Query": query,
},
Public: map[string]any{
"MaxLimit": maxLimit,
"QuerySize": len(query),
},
})
}
type remoteLokiClient interface {
Ping(context.Context) error
Push(context.Context, []lokiclient.Stream) error
@@ -77,262 +42,17 @@ type remoteLokiClient interface {
// RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
type RemoteLokiBackend struct {
client remoteLokiClient
externalLabels map[string]string
clock clock.Clock
metrics *metrics.Historian
log log.Logger
ac AccessControl
ruleStore RuleStore
*LokiWriter
*LokiReader
}
func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req alertingInstrument.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend {
return &RemoteLokiBackend{
client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName),
externalLabels: cfg.ExternalLabels,
clock: clock.New(),
metrics: metrics,
log: logger,
ac: ac,
ruleStore: ruleStore,
LokiWriter: NewLokiWriter(logger, cfg, req, metrics, tracer),
LokiReader: NewLokiReader(logger, cfg, req, metrics, tracer, ruleStore, ac),
}
}
func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
return h.client.Ping(ctx)
}
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
logStream := StatesToStream(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
if len(logStream.Values) == 0 {
close(errCh)
return errCh
}
// This is a new background job, so let's create a brand new context for it.
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
// immediately but rather try to flush writes.
// This also prevents timeouts or other lingering objects (like transactions) from being
// incorrectly propagated here from other areas.
writeCtx := context.Background()
writeCtx, cancel := context.WithTimeout(writeCtx, StateHistoryWriteTimeout)
writeCtx = history_model.WithRuleData(writeCtx, rule)
writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx))
go func(ctx context.Context) {
defer cancel()
defer close(errCh)
logger := h.log.FromContext(ctx)
logger.Debug("Saving state history batch", "samples", len(logStream.Values))
org := fmt.Sprint(rule.OrgID)
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
if err := h.recordStreams(ctx, logStream, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}(writeCtx)
return errCh
}
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
uids, err := h.getFolderUIDsForFilter(ctx, query)
if err != nil {
return nil, err
}
queries, err := BuildLogQuery(query, uids, h.client.MaxQuerySize())
if err != nil {
return nil, err
}
if len(queries) > 1 {
h.log.FromContext(ctx).Info("Execute query in multiple batches", "batchSize", len(queries), "folders", len(uids), "maxQueryLimit", h.client.MaxQuerySize())
}
now := time.Now().UTC()
if query.To.IsZero() {
query.To = now
}
if query.From.IsZero() {
query.From = now.Add(-defaultQueryRange)
}
var res []lokiclient.Stream
for _, logQL := range queries {
// Timestamps are expected in RFC3339Nano.
// Apply user-defined limit to every request. Multiple batches is a very rare case, and therefore we can tolerate getting more data than needed.
// The limit can be applied after all results are merged
r, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
if err != nil {
return nil, err
}
res = append(res, r.Data.Result...)
}
return h.merge(res, uids)
}
// merge will put all the results in one array sorted by timestamp.
func (h RemoteLokiBackend) merge(res []lokiclient.Stream, folderUIDToFilter []string) (*data.Frame, error) {
filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter))
for _, uid := range folderUIDToFilter {
filterByFolderUIDMap[uid] = struct{}{}
}
// Find the total number of elements in all arrays.
totalLen := 0
for _, arr := range res {
totalLen += len(arr.Values)
}
// Create a new slice to store the merged elements.
frame := data.NewFrame("states")
// We merge all series into a single linear history.
lbls := data.Labels(map[string]string{})
// We represent state history as a single merged history, that roughly corresponds to what you get in the Grafana Explore tab when querying Loki directly.
// The format is composed of the following vectors:
// 1. `time` - timestamp - when the transition happened
// 2. `line` - JSON - the full data of the transition
// 3. `labels` - JSON - the labels associated with that state transition
times := make([]time.Time, 0, totalLen)
lines := make([]json.RawMessage, 0, totalLen)
labels := make([]json.RawMessage, 0, totalLen)
// Initialize a slice of pointers to the current position in each array.
pointers := make([]int, len(res))
for {
minTime := int64(math.MaxInt64)
minEl := lokiclient.Sample{}
minElStreamIdx := -1
// Find the element with the earliest time among all arrays.
for i, stream := range res {
// Skip if we already reached the end of the current array.
if len(stream.Values) == pointers[i] {
continue
}
// check if stream should be in the results
if len(filterByFolderUIDMap) > 0 {
folderLbl, ok := stream.Stream[FolderUIDLabel]
if !ok {
continue // skip entries without folder UID, only if needs filtering
}
_, ok = filterByFolderUIDMap[folderLbl]
if !ok {
continue
}
}
curTime := stream.Values[pointers[i]].T.UnixNano()
if pointers[i] < len(stream.Values) && curTime < minTime {
minTime = curTime
minEl = stream.Values[pointers[i]]
minElStreamIdx = i
}
}
// If all pointers have reached the end of their arrays, we're done.
if minElStreamIdx == -1 {
break
}
entryBytes := []byte(minEl.V)
var entry LokiEntry
err := json.Unmarshal(entryBytes, &entry)
if err != nil {
h.log.Warn("failed to unmarshal entry, continuing", "err", err, "entry", minEl.V)
pointers[minElStreamIdx]++
continue
}
// Append the minimum element to the merged slice and move the pointer.
tsNano := minEl.T.UnixNano()
streamLbls := res[minElStreamIdx].Stream
lblsJson, err := json.Marshal(streamLbls)
if err != nil {
// This should in theory never happen, as we're marshalling a map[string]string.
h.log.Warn("failed to serialize stream labels, continuing", "err", err, "labels", streamLbls)
pointers[minElStreamIdx]++
continue
}
times = append(times, time.Unix(0, tsNano))
labels = append(labels, lblsJson)
lines = append(lines, json.RawMessage(entryBytes))
pointers[minElStreamIdx]++
}
frame.Fields = append(frame.Fields, data.NewField(dfTime, lbls, times))
frame.Fields = append(frame.Fields, data.NewField(dfLine, lbls, lines))
frame.Fields = append(frame.Fields, data.NewField(dfLabels, lbls, labels))
return frame, nil
}
func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) lokiclient.Stream {
labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
samples := make([]lokiclient.Sample, 0, len(states))
for _, state := range states {
if !shouldRecord(state) {
continue
}
sanitizedLabels := removePrivateLabels(state.Labels)
entry := LokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
Values: valuesAsDataBlob(state.State),
Condition: rule.Condition,
DashboardUID: rule.DashboardUID,
PanelID: rule.PanelID,
Fingerprint: labelFingerprint(sanitizedLabels),
RuleTitle: rule.Title,
RuleID: rule.ID,
RuleUID: rule.UID,
InstanceLabels: sanitizedLabels,
}
if state.State.State == eval.Error {
entry.Error = state.Error.Error()
}
jsn, err := json.Marshal(entry)
if err != nil {
logger.Error("Failed to construct history record for state, skipping", "error", err)
continue
}
line := string(jsn)
samples = append(samples, lokiclient.Sample{
T: state.LastEvaluationTime,
V: line,
})
}
return lokiclient.Stream{
Stream: labels,
Values: samples,
}
}
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error {
if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil {
return err
}
logger.Debug("Done saving alert state history batch", "samples", len(stream.Values))
return nil
}
type LokiEntry struct {
SchemaVersion int `json:"schemaVersion"`
Previous string `json:"previous"`
@@ -350,198 +70,3 @@ type LokiEntry struct {
// These should not be conflated with labels associated with log streams.
InstanceLabels map[string]string `json:"labels"`
}
func valuesAsDataBlob(state *state.State) *simplejson.Json {
if state.State == eval.Error || state.State == eval.NoData {
return simplejson.New()
}
return jsonifyValues(state.Values)
}
// BuildLogQuery converts models.HistoryQuery and a list of folder UIDs to Loki queries.
// It can return multiple queries if the list of folder UIDs is too big to fit into single query.
// If there is a folder UID long enough to exceed a query size it returns ErrQueryTooLong.
func BuildLogQuery(query models.HistoryQuery, folderUIDs []string, maxQuerySize int) ([]string, error) {
// first build tail of the query (if exists) to know what remaining capacity we have for folders
tail, err := buildQueryTail(query)
if err != nil {
return nil, err
}
// build the base selectors. skip the closing bracket because we will append folders below. Closing bracket will be added at the end
head := fmt.Sprintf(`{%s="%d",%s=%q`, OrgIDLabel, query.OrgID, StateHistoryLabelKey, StateHistoryLabelValue)
// check if system-defined + user-defined query parameters do not exceed maximum size
baseQuerySize := len(head) + 1 + len(tail) // 1 stands for closing bracket
if len(head)+1+len(tail) > maxQuerySize {
return nil, NewErrLokiQueryTooLong(head+"}"+tail, maxQuerySize)
}
// nothing to append to the head. Construct the query and return what we have
if len(folderUIDs) == 0 {
return []string{head + "}" + tail}, nil
}
remainingFolders := folderUIDs
var result []string
for len(remainingFolders) > 0 { // iterating until all folders are processed and at least one result
// pre-calculate the size of the string to avoid excessive re-allocations
requiredLengthEstimate := baseQuerySize + 1 + len(FolderUIDLabel) + 3 // 1 - comma, 3 - operator + opening backtick
for _, folder := range remainingFolders { // this does not account to escaping characters because it's not expected generally
requiredLengthEstimate += len(folder) + 1 // 1 - accounts to pipe-separator and backtick at the end
if requiredLengthEstimate > maxQuerySize {
break
}
}
s := strings.Builder{}
s.Grow(requiredLengthEstimate)
s.WriteString(head)
s.WriteString(",")
s.WriteString(FolderUIDLabel)
s.WriteString("=~`")
added := false
for len(remainingFolders) > 0 {
uid := regexp.QuoteMeta(remainingFolders[0])
requiredLength := s.Len() + len(tail) + len(uid) + 2 // 2 - backtick + closing bracket
if added {
requiredLength++ // account to pipe symbol
}
if requiredLength > maxQuerySize {
if !added {
// finish the query for logging
s.WriteString(uid)
s.WriteString("`}")
s.WriteString(tail)
return nil, NewErrLokiQueryTooLong(s.String(), maxQuerySize)
}
break
}
if added {
s.WriteString("|")
}
s.WriteString(uid)
added = true
remainingFolders = remainingFolders[1:]
}
s.WriteString("`")
s.WriteString("}")
s.WriteString(tail)
result = append(result, s.String())
}
return result, nil
}
func buildQueryTail(query models.HistoryQuery) (string, error) {
if !queryHasLogFilters(query) {
return "", nil
}
b := strings.Builder{}
b.WriteString(" | json")
if query.RuleUID != "" {
b.WriteString(" | ruleUID=")
_, err := fmt.Fprintf(&b, "%q", query.RuleUID)
if err != nil {
return "", err
}
}
if query.DashboardUID != "" {
b.WriteString(" | dashboardUID=")
_, err := fmt.Fprintf(&b, "%q", query.DashboardUID)
if err != nil {
return "", err
}
}
if query.PanelID != 0 {
b.WriteString(" | panelID=")
b.WriteString(strconv.FormatInt(query.PanelID, 10))
}
if query.Previous != "" {
b.WriteString(" | previous=~")
_, err := fmt.Fprintf(&b, "%q", "^"+regexp.QuoteMeta(query.Previous)+".*")
if err != nil {
return "", err
}
}
if query.Current != "" {
b.WriteString(" | current=~")
_, err := fmt.Fprintf(&b, "%q", "^"+regexp.QuoteMeta(query.Current)+".*")
if err != nil {
return "", err
}
}
requiredSize := 0
labelKeys := make([]string, 0, len(query.Labels))
for k, v := range query.Labels {
requiredSize += len(k) + len(v) + 13 // 13 all literals below
labelKeys = append(labelKeys, k)
}
// Ensure that all queries we build are deterministic.
sort.Strings(labelKeys)
b.Grow(requiredSize)
for _, k := range labelKeys {
b.WriteString(" | labels_")
b.WriteString(k)
b.WriteString("=")
_, err := fmt.Fprintf(&b, "%q", query.Labels[k])
if err != nil {
return "", err
}
}
return b.String(), nil
}
func queryHasLogFilters(query models.HistoryQuery) bool {
return query.RuleUID != "" ||
query.DashboardUID != "" ||
query.PanelID != 0 ||
query.Previous != "" ||
query.Current != "" ||
len(query.Labels) > 0
}
func (h *RemoteLokiBackend) getFolderUIDsForFilter(ctx context.Context, query models.HistoryQuery) ([]string, error) {
bypass, err := h.ac.CanReadAllRules(ctx, query.SignedInUser)
if err != nil {
return nil, err
}
if bypass { // if user has access to all rules and folder, remove filter
return nil, nil
}
// if there is a filter by rule UID, find that rule UID and make sure that user has access to it.
if query.RuleUID != "" {
rule, err := h.ruleStore.GetAlertRuleByUID(ctx, &models.GetAlertRuleByUIDQuery{
UID: query.RuleUID,
OrgID: query.OrgID,
})
if err != nil {
return nil, fmt.Errorf("failed to fetch alert rule by UID: %w", err)
}
if rule == nil {
return nil, models.ErrAlertRuleNotFound
}
return nil, h.ac.AuthorizeAccessInFolder(ctx, query.SignedInUser, rule)
}
// if no filter, then we need to get all namespaces user has access to
folders, err := h.ruleStore.GetUserVisibleNamespaces(ctx, query.OrgID, query.SignedInUser)
if err != nil {
return nil, fmt.Errorf("failed to fetch folders that user can access: %w", err)
}
uids := make([]string, 0, len(folders))
// now keep only UIDs of folder in which user can read rules.
for _, f := range folders {
hasAccess, err := h.ac.HasAccessInFolder(ctx, query.SignedInUser, models.Namespace(*f.ToFolderReference()))
if err != nil {
return nil, err
}
if !hasAccess {
continue
}
uids = append(uids, f.UID)
}
if len(uids) == 0 {
return nil, accesscontrol.NewAuthorizationErrorGeneric("read rules in any folder")
}
sort.Strings(uids)
return uids, nil
}

View File

@@ -0,0 +1,378 @@
package historian
import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
alertingInstrument "github.com/grafana/alerting/http/instrument"
"github.com/grafana/alerting/notify/historian/lokiclient"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/accesscontrol"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
var (
ErrLokiQueryTooLong = errutil.BadRequest("loki.requestTooLong").MustTemplate(
"Request to Loki exceeded ({{.Public.QuerySize}} bytes) configured maximum size of {{.Public.MaxLimit}} bytes. Query: {{.Private.Query}}",
errutil.WithPublic("Query for Loki exceeded the configured limit of {{.Public.MaxLimit}} bytes. Remove some filters and try again."),
)
)
func NewErrLokiQueryTooLong(query string, maxLimit int) error {
return ErrLokiQueryTooLong.Build(errutil.TemplateData{
Private: map[string]any{
"Query": query,
},
Public: map[string]any{
"MaxLimit": maxLimit,
"QuerySize": len(query),
},
})
}
type LokiReader struct {
client remoteLokiClient
log log.Logger
ac AccessControl
ruleStore RuleStore
}
func NewLokiReader(logger log.Logger, cfg lokiclient.LokiConfig, req alertingInstrument.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *LokiReader {
client := lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName)
return &LokiReader{
client: client,
log: logger,
ac: ac,
ruleStore: ruleStore,
}
}
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *LokiReader) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
uids, err := h.getFolderUIDsForFilter(ctx, query)
if err != nil {
return nil, err
}
queries, err := BuildLogQuery(query, uids, h.client.MaxQuerySize())
if err != nil {
return nil, err
}
if len(queries) > 1 {
h.log.FromContext(ctx).Info("Execute query in multiple batches", "batchSize", len(queries), "folders", len(uids), "maxQueryLimit", h.client.MaxQuerySize())
}
now := time.Now().UTC()
if query.To.IsZero() {
query.To = now
}
if query.From.IsZero() {
query.From = now.Add(-defaultQueryRange)
}
var res []lokiclient.Stream
for _, logQL := range queries {
// Timestamps are expected in RFC3339Nano.
// Apply user-defined limit to every request. Multiple batches is a very rare case, and therefore we can tolerate getting more data than needed.
// The limit can be applied after all results are merged
r, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
if err != nil {
return nil, err
}
res = append(res, r.Data.Result...)
}
return h.merge(res, uids)
}
// merge will put all the results in one array sorted by timestamp.
func (h LokiReader) merge(res []lokiclient.Stream, folderUIDToFilter []string) (*data.Frame, error) {
filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter))
for _, uid := range folderUIDToFilter {
filterByFolderUIDMap[uid] = struct{}{}
}
// Find the total number of elements in all arrays.
totalLen := 0
for _, arr := range res {
totalLen += len(arr.Values)
}
// Create a new slice to store the merged elements.
frame := data.NewFrame("states")
// We merge all series into a single linear history.
lbls := data.Labels(map[string]string{})
// We represent state history as a single merged history, that roughly corresponds to what you get in the Grafana Explore tab when querying Loki directly.
// The format is composed of the following vectors:
// 1. `time` - timestamp - when the transition happened
// 2. `line` - JSON - the full data of the transition
// 3. `labels` - JSON - the labels associated with that state transition
times := make([]time.Time, 0, totalLen)
lines := make([]json.RawMessage, 0, totalLen)
labels := make([]json.RawMessage, 0, totalLen)
// Initialize a slice of pointers to the current position in each array.
pointers := make([]int, len(res))
for {
minTime := int64(math.MaxInt64)
minEl := lokiclient.Sample{}
minElStreamIdx := -1
// Find the element with the earliest time among all arrays.
for i, stream := range res {
// Skip if we already reached the end of the current array.
if len(stream.Values) == pointers[i] {
continue
}
// check if stream should be in the results
if len(filterByFolderUIDMap) > 0 {
folderLbl, ok := stream.Stream[FolderUIDLabel]
if !ok {
continue // skip entries without folder UID, only if needs filtering
}
_, ok = filterByFolderUIDMap[folderLbl]
if !ok {
continue
}
}
curTime := stream.Values[pointers[i]].T.UnixNano()
if pointers[i] < len(stream.Values) && curTime < minTime {
minTime = curTime
minEl = stream.Values[pointers[i]]
minElStreamIdx = i
}
}
// If all pointers have reached the end of their arrays, we're done.
if minElStreamIdx == -1 {
break
}
entryBytes := []byte(minEl.V)
var entry LokiEntry
err := json.Unmarshal(entryBytes, &entry)
if err != nil {
h.log.Warn("failed to unmarshal entry, continuing", "err", err, "entry", minEl.V)
pointers[minElStreamIdx]++
continue
}
// Append the minimum element to the merged slice and move the pointer.
tsNano := minEl.T.UnixNano()
streamLbls := res[minElStreamIdx].Stream
lblsJson, err := json.Marshal(streamLbls)
if err != nil {
// This should in theory never happen, as we're marshalling a map[string]string.
h.log.Warn("failed to serialize stream labels, continuing", "err", err, "labels", streamLbls)
pointers[minElStreamIdx]++
continue
}
times = append(times, time.Unix(0, tsNano))
labels = append(labels, lblsJson)
lines = append(lines, json.RawMessage(entryBytes))
pointers[minElStreamIdx]++
}
frame.Fields = append(frame.Fields, data.NewField(dfTime, lbls, times))
frame.Fields = append(frame.Fields, data.NewField(dfLine, lbls, lines))
frame.Fields = append(frame.Fields, data.NewField(dfLabels, lbls, labels))
return frame, nil
}
func (h *LokiReader) getFolderUIDsForFilter(ctx context.Context, query models.HistoryQuery) ([]string, error) {
bypass, err := h.ac.CanReadAllRules(ctx, query.SignedInUser)
if err != nil {
return nil, err
}
if bypass { // if user has access to all rules and folder, remove filter
return nil, nil
}
// if there is a filter by rule UID, find that rule UID and make sure that user has access to it.
if query.RuleUID != "" {
rule, err := h.ruleStore.GetAlertRuleByUID(ctx, &models.GetAlertRuleByUIDQuery{
UID: query.RuleUID,
OrgID: query.OrgID,
})
if err != nil {
return nil, fmt.Errorf("failed to fetch alert rule by UID: %w", err)
}
if rule == nil {
return nil, models.ErrAlertRuleNotFound
}
return nil, h.ac.AuthorizeAccessInFolder(ctx, query.SignedInUser, rule)
}
// if no filter, then we need to get all namespaces user has access to
folders, err := h.ruleStore.GetUserVisibleNamespaces(ctx, query.OrgID, query.SignedInUser)
if err != nil {
return nil, fmt.Errorf("failed to fetch folders that user can access: %w", err)
}
uids := make([]string, 0, len(folders))
// now keep only UIDs of folder in which user can read rules.
for _, f := range folders {
hasAccess, err := h.ac.HasAccessInFolder(ctx, query.SignedInUser, models.Namespace(*f.ToFolderReference()))
if err != nil {
return nil, err
}
if !hasAccess {
continue
}
uids = append(uids, f.UID)
}
if len(uids) == 0 {
return nil, accesscontrol.NewAuthorizationErrorGeneric("read rules in any folder")
}
sort.Strings(uids)
return uids, nil
}
// BuildLogQuery converts models.HistoryQuery and a list of folder UIDs to Loki queries.
// It can return multiple queries if the list of folder UIDs is too big to fit into single query.
// If there is a folder UID long enough to exceed a query size it returns ErrQueryTooLong.
func BuildLogQuery(query models.HistoryQuery, folderUIDs []string, maxQuerySize int) ([]string, error) {
// first build tail of the query (if exists) to know what remaining capacity we have for folders
tail, err := buildQueryTail(query)
if err != nil {
return nil, err
}
// build the base selectors. skip the closing bracket because we will append folders below. Closing bracket will be added at the end
head := fmt.Sprintf(`{%s="%d",%s=%q`, OrgIDLabel, query.OrgID, StateHistoryLabelKey, StateHistoryLabelValue)
// check if system-defined + user-defined query parameters do not exceed maximum size
baseQuerySize := len(head) + 1 + len(tail) // 1 stands for closing bracket
if len(head)+1+len(tail) > maxQuerySize {
return nil, NewErrLokiQueryTooLong(head+"}"+tail, maxQuerySize)
}
// nothing to append to the head. Construct the query and return what we have
if len(folderUIDs) == 0 {
return []string{head + "}" + tail}, nil
}
remainingFolders := folderUIDs
var result []string
for len(remainingFolders) > 0 { // iterating until all folders are processed and at least one result
// pre-calculate the size of the string to avoid excessive re-allocations
requiredLengthEstimate := baseQuerySize + 1 + len(FolderUIDLabel) + 3 // 1 - comma, 3 - operator + opening backtick
for _, folder := range remainingFolders { // this does not account to escaping characters because it's not expected generally
requiredLengthEstimate += len(folder) + 1 // 1 - accounts to pipe-separator and backtick at the end
if requiredLengthEstimate > maxQuerySize {
break
}
}
s := strings.Builder{}
s.Grow(requiredLengthEstimate)
s.WriteString(head)
s.WriteString(",")
s.WriteString(FolderUIDLabel)
s.WriteString("=~`")
added := false
for len(remainingFolders) > 0 {
uid := regexp.QuoteMeta(remainingFolders[0])
requiredLength := s.Len() + len(tail) + len(uid) + 2 // 2 - backtick + closing bracket
if added {
requiredLength++ // account to pipe symbol
}
if requiredLength > maxQuerySize {
if !added {
// finish the query for logging
s.WriteString(uid)
s.WriteString("`}")
s.WriteString(tail)
return nil, NewErrLokiQueryTooLong(s.String(), maxQuerySize)
}
break
}
if added {
s.WriteString("|")
}
s.WriteString(uid)
added = true
remainingFolders = remainingFolders[1:]
}
s.WriteString("`")
s.WriteString("}")
s.WriteString(tail)
result = append(result, s.String())
}
return result, nil
}
func buildQueryTail(query models.HistoryQuery) (string, error) {
if !queryHasLogFilters(query) {
return "", nil
}
b := strings.Builder{}
b.WriteString(" | json")
if query.RuleUID != "" {
b.WriteString(" | ruleUID=")
_, err := fmt.Fprintf(&b, "%q", query.RuleUID)
if err != nil {
return "", err
}
}
if query.DashboardUID != "" {
b.WriteString(" | dashboardUID=")
_, err := fmt.Fprintf(&b, "%q", query.DashboardUID)
if err != nil {
return "", err
}
}
if query.PanelID != 0 {
b.WriteString(" | panelID=")
b.WriteString(strconv.FormatInt(query.PanelID, 10))
}
if query.Previous != "" {
b.WriteString(" | previous=~")
_, err := fmt.Fprintf(&b, "%q", "^"+regexp.QuoteMeta(query.Previous)+".*")
if err != nil {
return "", err
}
}
if query.Current != "" {
b.WriteString(" | current=~")
_, err := fmt.Fprintf(&b, "%q", "^"+regexp.QuoteMeta(query.Current)+".*")
if err != nil {
return "", err
}
}
requiredSize := 0
labelKeys := make([]string, 0, len(query.Labels))
for k, v := range query.Labels {
requiredSize += len(k) + len(v) + 13 // 13 all literals below
labelKeys = append(labelKeys, k)
}
// Ensure that all queries we build are deterministic.
sort.Strings(labelKeys)
b.Grow(requiredSize)
for _, k := range labelKeys {
b.WriteString(" | labels_")
b.WriteString(k)
b.WriteString("=")
_, err := fmt.Fprintf(&b, "%q", query.Labels[k])
if err != nil {
return "", err
}
}
return b.String(), nil
}
func queryHasLogFilters(query models.HistoryQuery) bool {
return query.RuleUID != "" ||
query.DashboardUID != "" ||
query.PanelID != 0 ||
query.Previous != "" ||
query.Current != "" ||
len(query.Labels) > 0
}

View File

@@ -0,0 +1,152 @@
package historian
import (
"context"
"encoding/json"
"fmt"
"github.com/benbjohnson/clock"
alertingInstrument "github.com/grafana/alerting/http/instrument"
"github.com/grafana/alerting/notify/historian/lokiclient"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
)
type LokiWriter struct {
client remoteLokiClient
externalLabels map[string]string
clock clock.Clock
metrics *metrics.Historian
log log.Logger
}
func NewLokiWriter(logger log.Logger, cfg lokiclient.LokiConfig, req alertingInstrument.Requester, metrics *metrics.Historian, tracer tracing.Tracer) *LokiWriter {
client := lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName)
return &LokiWriter{
client: client,
externalLabels: cfg.ExternalLabels,
clock: clock.New(),
metrics: metrics,
log: logger,
}
}
func (h *LokiWriter) TestConnection(ctx context.Context) error {
return h.client.Ping(ctx)
}
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *LokiWriter) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
logStream := StatesToStream(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
if len(logStream.Values) == 0 {
close(errCh)
return errCh
}
// This is a new background job, so let's create a brand new context for it.
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
// immediately but rather try to flush writes.
// This also prevents timeouts or other lingering objects (like transactions) from being
// incorrectly propagated here from other areas.
writeCtx := context.Background()
writeCtx, cancel := context.WithTimeout(writeCtx, StateHistoryWriteTimeout)
writeCtx = history_model.WithRuleData(writeCtx, rule)
writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx))
go func(ctx context.Context) {
defer cancel()
defer close(errCh)
logger := h.log.FromContext(ctx)
logger.Debug("Saving state history batch", "samples", len(logStream.Values))
org := fmt.Sprint(rule.OrgID)
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
if err := h.recordStreams(ctx, logStream, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}(writeCtx)
return errCh
}
func (h *LokiWriter) recordStreams(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error {
if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil {
return err
}
logger.Debug("Done saving alert state history batch", "samples", len(stream.Values))
return nil
}
func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) lokiclient.Stream {
labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
samples := make([]lokiclient.Sample, 0, len(states))
for _, state := range states {
if !shouldRecord(state) {
continue
}
sanitizedLabels := removePrivateLabels(state.Labels)
entry := LokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
Values: valuesAsDataBlob(state.State),
Condition: rule.Condition,
DashboardUID: rule.DashboardUID,
PanelID: rule.PanelID,
Fingerprint: labelFingerprint(sanitizedLabels),
RuleTitle: rule.Title,
RuleID: rule.ID,
RuleUID: rule.UID,
InstanceLabels: sanitizedLabels,
}
if state.State.State == eval.Error {
entry.Error = state.Error.Error()
}
jsn, err := json.Marshal(entry)
if err != nil {
logger.Error("Failed to construct history record for state, skipping", "error", err)
continue
}
line := string(jsn)
samples = append(samples, lokiclient.Sample{
T: state.LastEvaluationTime,
V: line,
})
}
return lokiclient.Stream{
Stream: labels,
Values: samples,
}
}
func valuesAsDataBlob(state *state.State) *simplejson.Json {
if state.State == eval.Error || state.State == eval.NoData {
return simplejson.New()
}
return jsonifyValues(state.Values)
}