Compare commits

...

35 Commits

Author SHA1 Message Date
Galen
49cace9952 Revert "chore: revert generic error handling"
This reverts commit effb9a1f9a.
2025-11-20 07:04:23 -06:00
Galen
effb9a1f9a chore: revert generic error handling 2025-11-20 07:03:59 -06:00
Galen
dc6dcc5a78 chore: mark old flag as deprecated 2025-11-19 22:06:22 -06:00
Galen
e2e30ba8dc chore: fix type error 2025-11-19 22:02:03 -06:00
Galen
c644838884 test: error coverage 2025-11-19 21:41:43 -06:00
Galen
c5b49ccab6 test: sharding 5xx coverage 2025-11-19 21:20:49 -06:00
Galen
42b07f638d chore: test coverage 2025-11-19 20:51:12 -06:00
Galen
91f04e8788 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 16:54:43 -06:00
Galen
3207a4c37a chore: update doc 2025-11-19 16:19:16 -06:00
Galen
2c95fae36b fix: query sharding not throwing panel errors for n+1 reqs 2025-11-19 15:40:56 -06:00
Galen
ef11f2b633 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 11:24:02 -06:00
Galen
af00322124 chore: clean up generateLimitsConfig 2025-11-19 10:39:02 -06:00
Galen
f1a8041666 chore: allow time splitting debug duration 2025-11-19 09:57:38 -06:00
Galen
1d631f3537 chore: clean up parse_query code 2025-11-19 09:57:00 -06:00
Galen
2f88b7dd8b chore: remove unused type 2025-11-19 09:34:46 -06:00
Galen
e57411e4d3 chore: add cue ref 2025-11-19 09:28:15 -06:00
Galen
db30a16862 chore: clean up 2025-11-19 09:18:34 -06:00
Galen
9bb9d1f420 chore: actually fix comment 2025-11-19 07:56:16 -06:00
Galen
10cdb94062 chore: clean up comment 2025-11-19 07:54:30 -06:00
Galen
293ffafa01 chore: clean up docblock 2025-11-19 07:51:29 -06:00
Galen
c1e7bcede5 chore: clean up, fix tests 2025-11-19 07:49:15 -06:00
Galen
48204060ef chore: clean up todo 2025-11-19 07:44:57 -06:00
Galen
5b8d8d6698 chore: clean up parse_query 2025-11-19 07:43:44 -06:00
Galen
bf2e29d106 chore: add lokiQueryLimitsContext feature flag 2025-11-19 07:43:21 -06:00
Galen
0c989fa3c9 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 07:13:58 -06:00
Galen
6004f14a38 chore: lint 2025-11-18 17:30:38 -06:00
Galen
9ec94f74b9 chore: fix lint 2025-11-18 17:09:34 -06:00
Galen
4a63a94b0d fix: frontend query splitting 2025-11-18 16:25:19 -06:00
Galen
0720aa1d88 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-18 14:16:46 -06:00
Galen
738855157e test: add api test 2025-11-18 13:36:40 -06:00
Galen
2bb6e6bd4b chore: lint backend, frontend docs 2025-11-18 12:53:53 -06:00
Galen
61cb899515 chore: backend tests, clean up 2025-11-18 12:41:03 -06:00
Galen
e0a4017838 chore: update backend to match spec defined in /loki/pull/19900 2025-11-18 10:50:12 -06:00
Galen
3c8ff953dd chore: update header names 2025-11-17 16:06:33 -06:00
Galen
cb2eacbdfc chore: poc sending query plan to loki query_range 2025-11-04 15:22:27 -06:00
26 changed files with 708 additions and 61 deletions

View File

@@ -597,7 +597,7 @@ export interface FeatureToggles {
*/ */
alertingPrometheusRulesPrimary?: boolean; alertingPrometheusRulesPrimary?: boolean;
/** /**
* Used in Logs Drilldown to split queries into multiple queries based on the number of shards * Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
*/ */
exploreLogsShardSplitting?: boolean; exploreLogsShardSplitting?: boolean;
/** /**
@@ -1183,6 +1183,10 @@ export interface FeatureToggles {
*/ */
ttlPluginInstanceManager?: boolean; ttlPluginInstanceManager?: boolean;
/** /**
* Send X-Loki-Query-Limits-Context header to Loki on first split request
*/
lokiQueryLimitsContext?: boolean;
/**
* Enables the new version of rudderstack * Enables the new version of rudderstack
* @default false * @default false
*/ */

View File

@@ -50,6 +50,14 @@ export interface LokiDataQuery extends common.DataQuery {
* Used to override the name of the series. * Used to override the name of the series.
*/ */
legendFormat?: string; legendFormat?: string;
/**
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
*/
limitsContext?: {
expr: string;
from: number;
to: number;
};
/** /**
* Used to limit the number of log rows returned. * Used to limit the number of log rows returned.
*/ */

View File

@@ -1041,7 +1041,7 @@ var (
}, },
{ {
Name: "exploreLogsShardSplitting", Name: "exploreLogsShardSplitting",
Description: "Used in Logs Drilldown to split queries into multiple queries based on the number of shards", Description: "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
Stage: FeatureStageExperimental, Stage: FeatureStageExperimental,
FrontendOnly: true, FrontendOnly: true,
Owner: grafanaObservabilityLogsSquad, Owner: grafanaObservabilityLogsSquad,
@@ -2054,6 +2054,13 @@ var (
FrontendOnly: true, FrontendOnly: true,
Owner: grafanaPluginsPlatformSquad, Owner: grafanaPluginsPlatformSquad,
}, },
{
Name: "lokiQueryLimitsContext",
Description: "Send X-Loki-Query-Limits-Context header to Loki on first split request",
Stage: FeatureStageExperimental,
FrontendOnly: true,
Owner: grafanaObservabilityLogsSquad,
},
{ {
Name: "rudderstackUpgrade", Name: "rudderstackUpgrade",
Description: "Enables the new version of rudderstack", Description: "Enables the new version of rudderstack",

View File

@@ -264,4 +264,5 @@ kubernetesAnnotations,experimental,@grafana/grafana-backend-services-squad,false
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
lokiQueryLimitsContext,experimental,@grafana/observability-logs,false,false,true
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
264 awsDatasourcesHttpProxy experimental @grafana/aws-datasources false false false
265 transformationsEmptyPlaceholder preview @grafana/datapro false false true
266 ttlPluginInstanceManager experimental @grafana/plugins-platform-backend false false true
267 lokiQueryLimitsContext experimental @grafana/observability-logs false false true
268 rudderstackUpgrade experimental @grafana/grafana-frontend-platform false false true

View File

@@ -552,7 +552,7 @@ const (
FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary" FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary"
// FlagExploreLogsShardSplitting // FlagExploreLogsShardSplitting
// Used in Logs Drilldown to split queries into multiple queries based on the number of shards // Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
FlagExploreLogsShardSplitting = "exploreLogsShardSplitting" FlagExploreLogsShardSplitting = "exploreLogsShardSplitting"
// FlagExploreLogsAggregatedMetrics // FlagExploreLogsAggregatedMetrics
@@ -1066,6 +1066,10 @@ const (
// Enable TTL plugin instance manager // Enable TTL plugin instance manager
FlagTtlPluginInstanceManager = "ttlPluginInstanceManager" FlagTtlPluginInstanceManager = "ttlPluginInstanceManager"
// FlagLokiQueryLimitsContext
// Send X-Loki-Query-Limits-Context header to Loki on first split request
FlagLokiQueryLimitsContext = "lokiQueryLimitsContext"
// FlagRudderstackUpgrade // FlagRudderstackUpgrade
// Enables the new version of rudderstack // Enables the new version of rudderstack
FlagRudderstackUpgrade = "rudderstackUpgrade" FlagRudderstackUpgrade = "rudderstackUpgrade"

View File

@@ -1574,11 +1574,14 @@
{ {
"metadata": { "metadata": {
"name": "exploreLogsShardSplitting", "name": "exploreLogsShardSplitting",
"resourceVersion": "1753448760331", "resourceVersion": "1763611567823",
"creationTimestamp": "2024-08-29T13:55:59Z" "creationTimestamp": "2024-08-29T13:55:59Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-11-20 04:06:07.82367 +0000 UTC"
}
}, },
"spec": { "spec": {
"description": "Used in Logs Drilldown to split queries into multiple queries based on the number of shards", "description": "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
"stage": "experimental", "stage": "experimental",
"codeowner": "@grafana/observability-logs", "codeowner": "@grafana/observability-logs",
"frontend": true "frontend": true
@@ -2636,6 +2639,19 @@
"codeowner": "@grafana/observability-logs" "codeowner": "@grafana/observability-logs"
} }
}, },
{
"metadata": {
"name": "lokiQueryLimitsContext",
"resourceVersion": "1763558434858",
"creationTimestamp": "2025-11-19T13:20:34Z"
},
"spec": {
"description": "Send X-Loki-Query-Limits-Context header to Loki on first split request",
"stage": "experimental",
"codeowner": "@grafana/observability-logs",
"frontend": true
}
},
{ {
"metadata": { "metadata": {
"name": "lokiQuerySplitting", "name": "lokiQuerySplitting",

View File

@@ -96,6 +96,8 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err)) return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
} }
addQueryLimitsHeader(query, req)
if query.SupportingQueryType != SupportingQueryNone { if query.SupportingQueryType != SupportingQueryNone {
value := getSupportingQueryHeaderValue(query.SupportingQueryType) value := getSupportingQueryHeaderValue(query.SupportingQueryType)
if value != "" { if value != "" {
@@ -108,6 +110,15 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
return req, nil return req, nil
} }
func addQueryLimitsHeader(query lokiQuery, req *http.Request) {
if len(query.LimitsContext.Expr) > 0 {
queryLimitStr, err := json.Marshal(query.LimitsContext)
if err == nil {
req.Header.Set("X-Loki-Query-Limits-Context", string(queryLimitStr))
}
}
}
type lokiResponseError struct { type lokiResponseError struct {
Message string `json:"message"` Message string `json:"message"`
TraceID string `json:"traceID,omitempty"` TraceID string `json:"traceID,omitempty"`

View File

@@ -2,10 +2,12 @@ package loki
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
"testing" "testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery" "github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
@@ -47,6 +49,56 @@ func TestApiLogVolume(t *testing.T) {
require.True(t, called) require.True(t, called)
}) })
t.Run("X-Loki-Query-Limits-Context header should be set when LimitsContext is provided", func(t *testing.T) {
called := false
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
to := time.Now().Truncate(time.Millisecond)
limitsContext := LimitsContext{
Expr: "{cluster=\"us-central1\"}",
From: from,
To: to,
}
limitsContextJson, _ := json.Marshal(limitsContext)
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, string(limitsContextJson), req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is missing expr", func(t *testing.T) {
called := false
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
to := time.Now().Truncate(time.Millisecond)
limitsContext := LimitsContext{
Expr: "",
From: from,
To: to,
}
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is not provided", func(t *testing.T) {
called := false
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("data sample queries should set data sample http header", func(t *testing.T) { t.Run("data sample queries should set data sample http header", func(t *testing.T) {
called := false called := false
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) { api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {

View File

@@ -18,6 +18,17 @@ const (
QueryEditorModeBuilder QueryEditorMode = "builder" QueryEditorModeBuilder QueryEditorMode = "builder"
) )
type LimitsContext struct {
Expr string `json:"expr"`
From int64 `json:"from"`
To int64 `json:"to"`
}
// NewLimitsContext creates a new LimitsContext object.
func NewLimitsContext() *LimitsContext {
return &LimitsContext{}
}
type LokiQueryType string type LokiQueryType string
const ( const (
@@ -59,6 +70,8 @@ type LokiDataQuery struct {
Instant *bool `json:"instant,omitempty"` Instant *bool `json:"instant,omitempty"`
// Used to set step value for range queries. // Used to set step value for range queries.
Step *string `json:"step,omitempty"` Step *string `json:"step,omitempty"`
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
LimitsContext *LimitsContext `json:"limitsContext,omitempty"`
// A unique identifier for the query within the list of targets. // A unique identifier for the query within the list of targets.
// In server side expressions, the refId is used as a variable name to identify results. // In server side expressions, the refId is used as a variable name to identify results.
// By default, the UI will assign A->Z; however setting meaningful names may be useful. // By default, the UI will assign A->Z; however setting meaningful names may be useful.

View File

@@ -156,6 +156,8 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step) expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step)
limitsConfig := generateLimitsConfig(model, interval, timeRange, queryType, step)
direction, err := parseDirection(model.Direction) direction, err := parseDirection(model.Direction)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -192,8 +194,21 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
RefID: query.RefID, RefID: query.RefID,
SupportingQueryType: supportingQueryType, SupportingQueryType: supportingQueryType,
Scopes: model.Scopes, Scopes: model.Scopes,
LimitsContext: limitsConfig,
}) })
} }
return qs, nil return qs, nil
} }
func generateLimitsConfig(model *QueryJSONModel, interval time.Duration, timeRange time.Duration, queryType QueryType, step time.Duration) LimitsContext {
var limitsConfig LimitsContext
// Only supply limits context config if we have expression, and from and to
if model.LimitsContext != nil && model.LimitsContext.Expr != "" && model.LimitsContext.From > 0 && model.LimitsContext.To > 0 {
// If a limits expression was provided, interpolate it and parse the time range
limitsConfig.Expr = interpolateVariables(model.LimitsContext.Expr, interval, timeRange, queryType, step)
limitsConfig.From = time.UnixMilli(model.LimitsContext.From)
limitsConfig.To = time.UnixMilli(model.LimitsContext.To)
}
return limitsConfig
}

View File

@@ -1,6 +1,7 @@
package loki package loki
import ( import (
"strconv"
"testing" "testing"
"time" "time"
@@ -145,6 +146,74 @@ func TestParseQuery(t *testing.T) {
require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr) require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr)
}) })
t.Run("parsing query model with invalid query limits context expr", func(t *testing.T) {
from := time.Now().Add(-3000 * time.Second)
fullFrom := time.Now().Add(-1 * time.Hour)
to := time.Now()
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
"format": "time_series",
"refId": "A",
"limitsContext": {"expr": "", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
}`,
),
TimeRange: backend.TimeRange{
From: from,
To: to,
},
Interval: time.Second * 15,
MaxDataPoints: 200,
},
},
}
models, err := parseQuery(queryContext, true)
require.NoError(t, err)
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
// If the limits context expression is missing, we don't set any limits context
require.Equal(t, ``, models[0].LimitsContext.Expr)
require.Equal(t, time.Time{}, models[0].LimitsContext.To)
require.Equal(t, time.Time{}, models[0].LimitsContext.From)
})
t.Run("parsing query model with query limits context", func(t *testing.T) {
from := time.Now().Add(-3000 * time.Second)
fullFrom := time.Now().Add(-1 * time.Hour)
to := time.Now()
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
"format": "time_series",
"refId": "A",
"limitsContext": {"expr": "count_over_time({service_name=\"apache\"}[$__auto])", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
}`,
),
TimeRange: backend.TimeRange{
From: from,
To: to,
},
Interval: time.Second * 15,
MaxDataPoints: 200,
},
},
}
models, err := parseQuery(queryContext, true)
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
require.Equal(t, `count_over_time({service_name="apache"}[15s])`, models[0].LimitsContext.Expr)
require.Equal(t, to.Truncate(time.Millisecond), models[0].LimitsContext.To)
require.Equal(t, fullFrom.Truncate(time.Millisecond), models[0].LimitsContext.From)
})
t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) { t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) {
expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms" expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms"
queryType := dataquery.LokiQueryTypeRange queryType := dataquery.LokiQueryTypeRange

View File

@@ -11,6 +11,11 @@ import (
type QueryType = dataquery.LokiQueryType type QueryType = dataquery.LokiQueryType
type SupportingQueryType = dataquery.SupportingQueryType type SupportingQueryType = dataquery.SupportingQueryType
type Direction = dataquery.LokiQueryDirection type Direction = dataquery.LokiQueryDirection
type LimitsContext struct {
Expr string
From time.Time
To time.Time
}
const ( const (
QueryTypeRange = dataquery.LokiQueryTypeRange QueryTypeRange = dataquery.LokiQueryTypeRange
@@ -42,4 +47,5 @@ type lokiQuery struct {
RefID string RefID string
SupportingQueryType SupportingQueryType SupportingQueryType SupportingQueryType
Scopes []scope.ScopeFilter Scopes []scope.ScopeFilter
LimitsContext LimitsContext
} }

View File

@@ -795,6 +795,7 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
<PanelChrome <PanelChrome
title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')} title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')}
collapsible collapsible
loadingState={logsVolumeData?.state}
collapsed={!logsVolumeEnabled} collapsed={!logsVolumeEnabled}
onToggleCollapse={onToggleLogsVolumeCollapse} onToggleCollapse={onToggleLogsVolumeCollapse}
> >

View File

@@ -26,7 +26,7 @@ import { mergeLogsVolumeDataFrames, isLogsVolumeLimited, getLogsVolumeMaximumRan
import { SupplementaryResultError } from '../SupplementaryResultError'; import { SupplementaryResultError } from '../SupplementaryResultError';
import { LogsVolumePanel } from './LogsVolumePanel'; import { LogsVolumePanel } from './LogsVolumePanel';
import { isTimeoutErrorResponse } from './utils/logsVolumeResponse'; import { isClientErrorResponse } from './utils/logsVolumeResponse';
type Props = { type Props = {
logsVolumeData: DataQueryResponse | undefined; logsVolumeData: DataQueryResponse | undefined;
@@ -92,7 +92,7 @@ export const LogsVolumePanelList = ({
const canShowPartialData = const canShowPartialData =
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0; config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
const timeoutError = isTimeoutErrorResponse(logsVolumeData); const clientError = isClientErrorResponse(logsVolumeData);
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from)); const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to)); const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to));
@@ -123,7 +123,7 @@ export const LogsVolumePanelList = ({
<Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans> <Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans>
</span> </span>
); );
} else if (timeoutError && !canShowPartialData) { } else if (clientError && !canShowPartialData) {
return ( return (
<SupplementaryResultError <SupplementaryResultError
title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')} title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')}
@@ -184,7 +184,7 @@ export const LogsVolumePanelList = ({
return ( return (
<div className={styles.listContainer}> <div className={styles.listContainer}>
{timeoutError && canShowPartialData && ( {clientError && canShowPartialData && (
<SupplementaryResultError <SupplementaryResultError
title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')} title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')}
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query." message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."

View File

@@ -1,6 +1,6 @@
import { DataQueryResponse } from '@grafana/data'; import { DataQueryResponse } from '@grafana/data';
import { isTimeoutErrorResponse } from './logsVolumeResponse'; import { isClientErrorResponse } from './logsVolumeResponse';
const errorA = const errorA =
'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)'; 'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)';
@@ -16,7 +16,7 @@ describe('isTimeoutErrorResponse', () => {
message: timeoutError, message: timeoutError,
}, },
}; };
expect(isTimeoutErrorResponse(response)).toBe(true); expect(isClientErrorResponse(response)).toBe(true);
} }
); );
test.each([errorA, errorB])( test.each([errorA, errorB])(
@@ -33,7 +33,7 @@ describe('isTimeoutErrorResponse', () => {
}, },
], ],
}; };
expect(isTimeoutErrorResponse(response)).toBe(true); expect(isClientErrorResponse(response)).toBe(true);
} }
); );
test.each([errorA, errorB])( test.each([errorA, errorB])(
@@ -54,13 +54,13 @@ describe('isTimeoutErrorResponse', () => {
}, },
], ],
}; };
expect(isTimeoutErrorResponse(response)).toBe(true); expect(isClientErrorResponse(response)).toBe(true);
} }
); );
test('does not report false positives', () => { test('does not report false positives', () => {
const response: DataQueryResponse = { const response: DataQueryResponse = {
data: [], data: [],
}; };
expect(isTimeoutErrorResponse(response)).toBe(false); expect(isClientErrorResponse(response)).toBe(false);
}); });
}); });

View File

@@ -1,7 +1,8 @@
import { DataQueryError, DataQueryResponse } from '@grafana/data'; import { DataQueryError, DataQueryResponse } from '@grafana/data';
import { is4xxError } from '@grafana-plugins/loki/responseUtils';
// Currently we can only infer if an error response is a timeout or not. // Currently we can only infer if an error response is a timeout or not.
export function isTimeoutErrorResponse(response: DataQueryResponse | undefined): boolean { export function isClientErrorResponse(response: DataQueryResponse | undefined): boolean {
if (!response) { if (!response) {
return false; return false;
} }
@@ -13,6 +14,8 @@ export function isTimeoutErrorResponse(response: DataQueryResponse | undefined):
return errors.some((error: DataQueryError) => { return errors.some((error: DataQueryError) => {
const message = `${error.message || error.data?.message}`?.toLowerCase(); const message = `${error.message || error.data?.message}`?.toLowerCase();
return message.includes('timeout'); return (
message.includes('timeout') || message?.includes('the query would read too many bytes') || is4xxError(response)
);
}); });
} }

View File

@@ -42,6 +42,14 @@ composableKinds: DataQuery: {
instant?: bool instant?: bool
// Used to set step value for range queries. // Used to set step value for range queries.
step?: string step?: string
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
limitsContext?: #LimitsContext
#LimitsContext: {
expr: string
from: int64
to: int64
}
#QueryEditorMode: "code" | "builder" @cuetsy(kind="enum") #QueryEditorMode: "code" | "builder" @cuetsy(kind="enum")

View File

@@ -48,6 +48,14 @@ export interface LokiDataQuery extends common.DataQuery {
* Used to override the name of the series. * Used to override the name of the series.
*/ */
legendFormat?: string; legendFormat?: string;
/**
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
*/
limitsContext?: {
expr: string;
from: number;
to: number;
};
/** /**
* Used to limit the number of log rows returned. * Used to limit the number of log rows returned.
*/ */

View File

@@ -36,6 +36,11 @@ function getFrameKey(frame: DataFrame): string | undefined {
return frame.refId ?? frame.name; return frame.refId ?? frame.name;
} }
/**
* @todo test new response is error, current response is not
* @param currentResponse
* @param newResponse
*/
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) { export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
if (!currentResponse) { if (!currentResponse) {
return cloneQueryResponse(newResponse); return cloneQueryResponse(newResponse);
@@ -65,6 +70,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])]; const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
if (mergedErrors.length > 0) { if (mergedErrors.length > 0) {
currentResponse.errors = mergedErrors; currentResponse.errors = mergedErrors;
currentResponse.state = LoadingState.Error;
} }
// the `.error` attribute is obsolete now, // the `.error` attribute is obsolete now,
@@ -75,6 +81,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
const mergedError = currentResponse.error ?? newResponse.error; const mergedError = currentResponse.error ?? newResponse.error;
if (mergedError != null) { if (mergedError != null) {
currentResponse.error = mergedError; currentResponse.error = mergedError;
currentResponse.state = LoadingState.Error;
} }
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])]; const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];

View File

@@ -222,6 +222,33 @@ export function getMockFrames() {
length: 2, length: 2,
}; };
const metricFrameAB: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1000000, 2000000, 3000000, 4000000],
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: [6, 7, 5, 4],
labels: {
level: 'debug',
},
},
],
meta: {
notices: [],
type: DataFrameType.TimeSeriesMulti,
stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }],
},
length: 4,
};
const metricFrameC: DataFrame = { const metricFrameC: DataFrame = {
refId: 'A', refId: 'A',
name: 'some-time-series', name: 'some-time-series',
@@ -305,6 +332,7 @@ export function getMockFrames() {
metricFrameA, metricFrameA,
metricFrameB, metricFrameB,
metricFrameC, metricFrameC,
metricFrameAB,
emptyFrame, emptyFrame,
}; };
} }

View File

@@ -1,6 +1,6 @@
import { of } from 'rxjs'; import { of } from 'rxjs';
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data'; import { DataQueryError, DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
import { config } from '@grafana/runtime'; import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
@@ -16,6 +16,7 @@ jest.mock('uuid', () => ({
})); }));
const originalShardingFlagState = config.featureToggles.lokiShardSplitting; const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
const originalErr = console.error; const originalErr = console.error;
beforeEach(() => { beforeEach(() => {
jest.spyOn(console, 'error').mockImplementation(() => {}); jest.spyOn(console, 'error').mockImplementation(() => {});
@@ -26,22 +27,23 @@ beforeAll(() => {
callback(); callback();
}); });
config.featureToggles.lokiShardSplitting = false; config.featureToggles.lokiShardSplitting = false;
config.featureToggles.lokiQueryLimitsContext = true;
}); });
afterAll(() => { afterAll(() => {
jest.mocked(global.setTimeout).mockReset(); jest.mocked(global.setTimeout).mockReset();
config.featureToggles.lokiShardSplitting = originalShardingFlagState; config.featureToggles.lokiShardSplitting = originalShardingFlagState;
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
console.error = originalErr; console.error = originalErr;
}); });
describe('runSplitQuery()', () => { describe('runSplitQuery()', () => {
let datasource: LokiDatasource; let datasource: LokiDatasource;
const from = dateTime('2023-02-08T05:00:00.000Z');
const to = dateTime('2023-02-10T06:00:00.000Z');
const range = { const range = {
from: dateTime('2023-02-08T05:00:00.000Z'), from,
to: dateTime('2023-02-10T06:00:00.000Z'), to,
raw: { raw: { from, to },
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
},
}; };
const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => { const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
@@ -165,6 +167,19 @@ describe('runSplitQuery()', () => {
_i: 1676008800000, _i: 1676008800000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: {
expr: 'count_over_time({a="b"}[1m])',
from: from.valueOf(),
to: to.valueOf(),
},
},
],
}) })
); );
@@ -183,6 +198,15 @@ describe('runSplitQuery()', () => {
_i: 1676005140000, _i: 1676005140000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: undefined,
},
],
}) })
); );
@@ -201,6 +225,15 @@ describe('runSplitQuery()', () => {
_i: 1675918740000, _i: 1675918740000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: undefined,
},
],
}) })
); );
}); });
@@ -225,6 +258,19 @@ describe('runSplitQuery()', () => {
_i: 1676008800000, _i: 1676008800000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: {
expr: 'count_over_time({a="b"}[1m])',
from: from.valueOf(),
to: to.valueOf(),
},
},
],
}) })
); );
@@ -243,6 +289,15 @@ describe('runSplitQuery()', () => {
_i: 1676005190000, _i: 1676005190000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: undefined,
},
],
}) })
); );
@@ -261,21 +316,127 @@ describe('runSplitQuery()', () => {
_i: 1675918790000, _i: 1675918790000,
}), }),
}), }),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: undefined,
},
],
}) })
); );
}); });
}); });
test('Handles and reports errors', async () => { test('Retries 5xx errors', async () => {
const { metricFrameA, metricFrameB, metricFrameAB } = getMockFrames();
const error: DataQueryError = {
message: 'OOPSIE',
status: 518,
};
const errResponse: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
key: 'uuid',
};
const response: DataQueryResponse = {
state: LoadingState.Done,
data: [metricFrameA],
key: 'uuid',
};
const response2: DataQueryResponse = {
state: LoadingState.Done,
data: [metricFrameB],
key: 'uuid',
};
jest jest
.spyOn(datasource, 'runQuery') .spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] })); .mockReturnValueOnce(of(errResponse))
.mockReturnValueOnce(of(response))
.mockReturnValueOnce(of(response2));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(4);
expect(values[0]).toEqual(
expect.objectContaining({
data: [metricFrameAB],
key: 'uuid',
state: LoadingState.Done,
})
);
});
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
});
test('Handles and reports 5xx error too many bytes', async () => {
const error: DataQueryError = {
message: 'the query would read too many bytes ...',
status: 500,
};
const response: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
};
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => { await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1); expect(values).toHaveLength(1);
expect(values[0]).toEqual( expect(values[0]).toEqual(
expect.objectContaining({ error: { refId: 'A', message: 'Error' }, state: LoadingState.Streaming }) expect.objectContaining({
errors: [error],
state: LoadingState.Error,
})
); );
}); });
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Handles and reports 4xx errors', async () => {
const error: DataQueryError = {
message: 'BAD REQUEST',
status: 418,
};
const response: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
};
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({
errors: [error],
state: LoadingState.Error,
})
);
});
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Handles and reports errors (deprecated error)', async () => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(
of({
state: LoadingState.Error,
error: { refId: 'A', message: 'the query would read too many bytes ...' },
data: [],
key: 'uuid',
})
);
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({
error: { refId: 'A', message: 'the query would read too many bytes ...' },
state: LoadingState.Error,
})
);
});
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
}); });
describe('Hidden and empty queries', () => { describe('Hidden and empty queries', () => {

View File

@@ -8,16 +8,18 @@ import {
DataQueryResponse, DataQueryResponse,
DataTopic, DataTopic,
dateTime, dateTime,
rangeUtil,
TimeRange,
LoadingState, LoadingState,
rangeUtil,
store,
TimeRange,
} from '@grafana/data'; } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting'; import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
import { combineResponses } from './mergeResponses'; import { combineResponses } from './mergeResponses';
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting'; import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils'; import { addQueryLimitsContext, isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
import { isRetriableError } from './responseUtils'; import { isRetriableError } from './responseUtils';
import { trackGroupedQueries } from './tracking'; import { trackGroupedQueries } from './tracking';
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types'; import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
@@ -55,6 +57,10 @@ interface QuerySplittingOptions {
* Do not retry failed queries. * Do not retry failed queries.
*/ */
disableRetry?: boolean; disableRetry?: boolean;
/**
* The current index of all query attempts
*/
shardQueryIndex?: number;
} }
/** /**
@@ -85,6 +91,25 @@ export function adjustTargetsFromResponseState(targets: LokiQuery[], response: D
}) })
.filter((target) => target.maxLines === undefined || target.maxLines > 0); .filter((target) => target.maxLines === undefined || target.maxLines > 0);
} }
const addLimitsToSplitRequests = (splitQueryIndex: number, shardQueryIndex: number, requests: LokiGroupedRequest[]) => {
// requests has already been mutated
return requests.map((r) => ({
...r,
request: {
...r.request,
targets: r.request.targets.map((t) => {
// @todo if we retry the first request, we will strip out the query limits context
if (splitQueryIndex === 0 && shardQueryIndex === 0) {
// Don't pull from request if it has already been added by `addLimitsToShardGroups`
return t.limitsContext === undefined ? addQueryLimitsContext(t, r.request) : t;
}
return { ...t, limitsContext: undefined };
}),
},
}));
};
export function runSplitGroupedQueries( export function runSplitGroupedQueries(
datasource: LokiDatasource, datasource: LokiDatasource,
requests: LokiGroupedRequest[], requests: LokiGroupedRequest[],
@@ -99,8 +124,15 @@ export function runSplitGroupedQueries(
let subquerySubscription: Subscription | null = null; let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>(); let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null; let retryTimer: ReturnType<typeof setTimeout> | null = null;
let splitQueryIndex = 0;
const shardQueryIndex = options.shardQueryIndex ?? 0;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => { const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
if (config.featureToggles.lokiQueryLimitsContext) {
requests = addLimitsToSplitRequests(splitQueryIndex, shardQueryIndex, requests);
}
splitQueryIndex++;
let retrying = false; let retrying = false;
if (subquerySubscription != null) { if (subquerySubscription != null) {
@@ -114,7 +146,10 @@ export function runSplitGroupedQueries(
} }
const done = () => { const done = () => {
mergedResponse.state = LoadingState.Done; if (mergedResponse.state !== LoadingState.Error) {
mergedResponse.state = LoadingState.Done;
}
subscriber.next(mergedResponse); subscriber.next(mergedResponse);
subscriber.complete(); subscriber.complete();
}; };
@@ -189,6 +224,10 @@ export function runSplitGroupedQueries(
if (!options.skipPartialUpdates) { if (!options.skipPartialUpdates) {
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN); mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
} }
if (mergedResponse.state === LoadingState.Error) {
done();
}
}, },
complete: () => { complete: () => {
if (retrying) { if (retrying) {
@@ -301,7 +340,9 @@ export function runSplitQuery(
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr)); const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
request.queryGroupId = uuidv4(); request.queryGroupId = uuidv4();
const oneDayMs = 24 * 60 * 60 * 1000; // Allow custom split durations for debugging, e.g. `localStorage.setItem('grafana.loki.querySplitInterval', 24 * 60 * 1000) // 1 hour`
const debugSplitDuration = parseInt(store.get('grafana.loki.querySplitInterval'), 10);
const oneDayMs = debugSplitDuration || 24 * 60 * 60 * 1000;
const directionPartitionedLogQueries = groupBy(logQueries, (query) => const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
); );

View File

@@ -1,6 +1,7 @@
import { SyntaxNode } from '@lezer/common'; import { SyntaxNode } from '@lezer/common';
import { escapeRegExp } from 'lodash'; import { escapeRegExp } from 'lodash';
import { DataQueryRequest } from '@grafana/data';
import { import {
parser, parser,
LineFilter, LineFilter,
@@ -310,6 +311,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
export function requestSupportsSplitting(allQueries: LokiQuery[]) { export function requestSupportsSplitting(allQueries: LokiQuery[]) {
const queries = allQueries const queries = allQueries
.filter((query) => !query.hide) .filter((query) => !query.hide)
.filter((query) => query.queryType !== LokiQueryType.Instant)
.filter((query) => !query.refId.includes('do-not-chunk')) .filter((query) => !query.refId.includes('do-not-chunk'))
.filter((query) => query.expr); .filter((query) => query.expr);
@@ -425,3 +427,21 @@ export const getSelectorForShardValues = (query: string) => {
} }
return ''; return '';
}; };
/**
* Adds query plan to shard/split queries
* Must be called after interpolation step!
*
* @param lokiQuery
* @param request
*/
export const addQueryLimitsContext = (lokiQuery: LokiQuery, request: DataQueryRequest<LokiQuery>) => {
return {
...lokiQuery,
limitsContext: {
expr: lokiQuery.expr,
from: request.range.from.toDate().getTime(),
to: request.range.to.toDate().getTime(),
},
};
};

View File

@@ -1,4 +1,4 @@
import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data'; import { DataFrame, DataQueryError, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
import { isBytesString, processLabels } from './languageUtils'; import { isBytesString, processLabels } from './languageUtils';
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser'; import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
@@ -134,13 +134,44 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
export function isRetriableError(errorResponse: DataQueryResponse) { export function isRetriableError(errorResponse: DataQueryResponse) {
const message = errorResponse.errors const message = errorResponse.errors
? (errorResponse.errors[0].message ?? '').toLowerCase() ? errorResponse.errors
: (errorResponse.error?.message ?? ''); .map((err) => err.message ?? '')
.join()
.toLowerCase()
: (errorResponse.error?.message ?? '').toLowerCase();
// max_query_bytes_read exceeded, currently 500 when should be 4xx
if (message.includes('the query would read too many bytes') || is4xxError(errorResponse)) {
throw new Error(message);
}
if (message.includes('timeout')) { if (message.includes('timeout')) {
return true; return true;
} else if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) { }
if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
// Error response but we're receiving data, continue querying. // Error response but we're receiving data, continue querying.
return false; return false;
} }
if (is5xxError(errorResponse)) {
return true;
}
throw new Error(message); throw new Error(message);
} }
export function is4xxError(errorResponse: DataQueryResponse) {
/**
* Before https://github.com/grafana/grafana/pull/114201 the Loki data source always returns a 500 for every error response type in the response body, and this is what Grafana uses to populate the DataQueryError
* Since the frontend and backend are being deployed separately now we might want to continue to check error messages for a bit until we are sure that the correct status code is always set in the data query response.
*
* @param errorResponse
*/
return isHttpErrorType(errorResponse, '4');
}
export function is5xxError(errorResponse: DataQueryResponse) {
return isHttpErrorType(errorResponse, '5');
}
function isHttpErrorType(errorResponse: DataQueryResponse, responseType: '2' | '3' | '4' | '5') {
const isErrOfType = (err: DataQueryError) => err.status && Array.from(err.status?.toString())[0] === responseType;
return (errorResponse.error && isErrOfType(errorResponse.error)) || errorResponse.errors?.some(isErrOfType);
}

View File

@@ -1,6 +1,7 @@
import { of } from 'rxjs'; import { of } from 'rxjs';
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data'; import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
import { createLokiDatasource } from './mocks/datasource'; import { createLokiDatasource } from './mocks/datasource';
@@ -12,6 +13,8 @@ jest.mock('uuid', () => ({
v4: jest.fn().mockReturnValue('uuid'), v4: jest.fn().mockReturnValue('uuid'),
})); }));
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
const originalLog = console.log; const originalLog = console.log;
const originalWarn = console.warn; const originalWarn = console.warn;
const originalErr = console.error; const originalErr = console.error;
@@ -20,20 +23,26 @@ beforeEach(() => {
jest.spyOn(console, 'warn').mockImplementation(() => {}); jest.spyOn(console, 'warn').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {}); jest.spyOn(console, 'error').mockImplementation(() => {});
}); });
beforeAll(() => {
config.featureToggles.lokiQueryLimitsContext = true;
});
afterAll(() => { afterAll(() => {
console.log = originalLog; console.log = originalLog;
console.warn = originalWarn; console.warn = originalWarn;
console.error = originalErr; console.error = originalErr;
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
}); });
describe('runShardSplitQuery()', () => { describe('runShardSplitQuery()', () => {
let datasource: LokiDatasource; let datasource: LokiDatasource;
const from = dateTime('2023-02-08T04:00:00.000Z');
const to = dateTime('2023-02-08T11:00:00.000Z');
const range = { const range = {
from: dateTime('2023-02-08T04:00:00.000Z'), from,
to: dateTime('2023-02-08T11:00:00.000Z'), to,
raw: { raw: {
from: dateTime('2023-02-08T04:00:00.000Z'), from,
to: dateTime('2023-02-08T11:00:00.000Z'), to,
}, },
}; };
@@ -139,6 +148,11 @@ describe('runShardSplitQuery()', () => {
targets: [ targets: [
{ {
expr: '{a="b", __stream_shard__=~"20|10"}', expr: '{a="b", __stream_shard__=~"20|10"}',
limitsContext: {
expr: `{a="b"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A', refId: 'A',
direction: LokiQueryDirection.Scan, direction: LokiQueryDirection.Scan,
}, },
@@ -209,6 +223,11 @@ describe('runShardSplitQuery()', () => {
targets: [ targets: [
{ {
expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}', expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}',
limitsContext: {
expr: `{service_name="test", filter="true"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A', refId: 'A',
direction: LokiQueryDirection.Scan, direction: LokiQueryDirection.Scan,
}, },
@@ -241,28 +260,113 @@ describe('runShardSplitQuery()', () => {
}); });
}); });
test('Failed requests have loading state Error', async () => { describe('Errors', () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']); beforeEach(() => {
jest const querySplittingRange = {
.spyOn(datasource, 'runQuery') from: dateTime('2023-02-08T05:00:00.000Z'),
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] })); to: dateTime('2023-02-10T06:00:00.000Z'),
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => { raw: {
expect(response[0].state).toBe(LoadingState.Error); from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
},
};
request = createRequest([{ expr: '$SELECTOR', refId: 'A', direction: LokiQueryDirection.Scan }], {
range: querySplittingRange,
});
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
}); });
});
test('Does not retry on other errors', async () => { test('Failed 4xx responses have loading state Error', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']); jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '12', '5']);
jest jest
.spyOn(datasource, 'runQuery') .spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })); .mockReturnValue(
// @ts-expect-error of({ state: LoadingState.Error, error: { refId: 'A', message: 'client error', status: 400 }, data: [] })
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => { );
callback(); await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
}); expect(response[0].state).toBe(LoadingState.Error);
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => { });
expect(datasource.runQuery).toHaveBeenCalledTimes(1); expect(datasource.runQuery).toHaveBeenCalledTimes(1);
}); });
test('Max query bytes errors are not retried', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
errors: [{ refId: 'A', message: 'the query would read too many bytes ...', status: 500 }],
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Failed 5xx requests are retried', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
errors: [{ refId: 'A', message: 'parse error', status: 500 }],
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Done);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
});
test('Failed 5xx requests are retried (dep)', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
error: { refId: 'A', message: 'parse error', status: 500 },
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Done);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
});
test('Does not retry on other errors', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(
of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })
);
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
});
}); });
test('Adjusts the group size based on errors and execution time', async () => { test('Adjusts the group size based on errors and execution time', async () => {
@@ -425,6 +529,11 @@ describe('runShardSplitQuery()', () => {
targets: [ targets: [
{ {
expr: '{a="b", __stream_shard__=~"20|10|9"}', expr: '{a="b", __stream_shard__=~"20|10|9"}',
limitsContext: {
expr: `{a="b"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A', refId: 'A',
direction: LokiQueryDirection.Scan, direction: LokiQueryDirection.Scan,
}, },

View File

@@ -2,15 +2,20 @@ import { groupBy, partition } from 'lodash';
import { Observable, Subscriber, Subscription } from 'rxjs'; import { Observable, Subscriber, Subscription } from 'rxjs';
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data'; import { DataQueryRequest, DataQueryResponse, LoadingState, QueryResultMetaStat } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
import { combineResponses, replaceResponses } from './mergeResponses'; import { combineResponses, replaceResponses } from './mergeResponses';
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting'; import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils'; import {
addQueryLimitsContext,
getSelectorForShardValues,
interpolateShardingSelector,
requestSupportsSharding,
} from './queryUtils';
import { isRetriableError } from './responseUtils'; import { isRetriableError } from './responseUtils';
import { LokiQuery } from './types'; import { LokiQuery } from './types';
/** /**
* Query splitting by stream shards. * Query splitting by stream shards.
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data, * Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
@@ -54,6 +59,19 @@ export function runShardSplitQuery(datasource: LokiDatasource, request: DataQuer
return splitQueriesByStreamShard(datasource, request, queries); return splitQueriesByStreamShard(datasource, request, queries);
} }
const addLimitsToShardGroups = (
queryIndex: number,
groups: ShardedQueryGroup[],
request: DataQueryRequest<LokiQuery>
) => {
return groups.map((g) => ({
...g,
targets: g.targets.map((t) => {
return queryIndex === 0 ? addQueryLimitsContext(t, request) : { ...t, limitsContext: undefined };
}),
}));
};
function splitQueriesByStreamShard( function splitQueriesByStreamShard(
datasource: LokiDatasource, datasource: LokiDatasource,
request: DataQueryRequest<LokiQuery>, request: DataQueryRequest<LokiQuery>,
@@ -64,8 +82,13 @@ function splitQueriesByStreamShard(
let subquerySubscription: Subscription | null = null; let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>(); let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null; let retryTimer: ReturnType<typeof setTimeout> | null = null;
let queryIndex = 0;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => { const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
if (config.featureToggles.lokiQueryLimitsContext) {
groups = addLimitsToShardGroups(queryIndex, groups, request);
}
queryIndex++;
let nextGroupSize = groups[group].groupSize; let nextGroupSize = groups[group].groupSize;
const { shards, groupSize, cycle } = groups[group]; const { shards, groupSize, cycle } = groups[group];
let retrying = false; let retrying = false;
@@ -164,6 +187,7 @@ function splitQueriesByStreamShard(
subquerySubscription = runSplitQuery(datasource, subRequest, { subquerySubscription = runSplitQuery(datasource, subRequest, {
skipPartialUpdates: true, skipPartialUpdates: true,
disableRetry: true, disableRetry: true,
shardQueryIndex: queryIndex - 1,
}).subscribe({ }).subscribe({
next: (partialResponse: DataQueryResponse) => { next: (partialResponse: DataQueryResponse) => {
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) { if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {