mirror of
https://github.com/grafana/grafana.git
synced 2025-12-20 19:44:55 +08:00
Compare commits
35 Commits
bugfix/fil
...
gtk-grafan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49cace9952 | ||
|
|
effb9a1f9a | ||
|
|
dc6dcc5a78 | ||
|
|
e2e30ba8dc | ||
|
|
c644838884 | ||
|
|
c5b49ccab6 | ||
|
|
42b07f638d | ||
|
|
91f04e8788 | ||
|
|
3207a4c37a | ||
|
|
2c95fae36b | ||
|
|
ef11f2b633 | ||
|
|
af00322124 | ||
|
|
f1a8041666 | ||
|
|
1d631f3537 | ||
|
|
2f88b7dd8b | ||
|
|
e57411e4d3 | ||
|
|
db30a16862 | ||
|
|
9bb9d1f420 | ||
|
|
10cdb94062 | ||
|
|
293ffafa01 | ||
|
|
c1e7bcede5 | ||
|
|
48204060ef | ||
|
|
5b8d8d6698 | ||
|
|
bf2e29d106 | ||
|
|
0c989fa3c9 | ||
|
|
6004f14a38 | ||
|
|
9ec94f74b9 | ||
|
|
4a63a94b0d | ||
|
|
0720aa1d88 | ||
|
|
738855157e | ||
|
|
2bb6e6bd4b | ||
|
|
61cb899515 | ||
|
|
e0a4017838 | ||
|
|
3c8ff953dd | ||
|
|
cb2eacbdfc |
@@ -597,7 +597,7 @@ export interface FeatureToggles {
|
||||
*/
|
||||
alertingPrometheusRulesPrimary?: boolean;
|
||||
/**
|
||||
* Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||
* Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||
*/
|
||||
exploreLogsShardSplitting?: boolean;
|
||||
/**
|
||||
@@ -1183,6 +1183,10 @@ export interface FeatureToggles {
|
||||
*/
|
||||
ttlPluginInstanceManager?: boolean;
|
||||
/**
|
||||
* Send X-Loki-Query-Limits-Context header to Loki on first split request
|
||||
*/
|
||||
lokiQueryLimitsContext?: boolean;
|
||||
/**
|
||||
* Enables the new version of rudderstack
|
||||
* @default false
|
||||
*/
|
||||
|
||||
@@ -50,6 +50,14 @@ export interface LokiDataQuery extends common.DataQuery {
|
||||
* Used to override the name of the series.
|
||||
*/
|
||||
legendFormat?: string;
|
||||
/**
|
||||
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||
*/
|
||||
limitsContext?: {
|
||||
expr: string;
|
||||
from: number;
|
||||
to: number;
|
||||
};
|
||||
/**
|
||||
* Used to limit the number of log rows returned.
|
||||
*/
|
||||
|
||||
@@ -1041,7 +1041,7 @@ var (
|
||||
},
|
||||
{
|
||||
Name: "exploreLogsShardSplitting",
|
||||
Description: "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||
Description: "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||
Stage: FeatureStageExperimental,
|
||||
FrontendOnly: true,
|
||||
Owner: grafanaObservabilityLogsSquad,
|
||||
@@ -2054,6 +2054,13 @@ var (
|
||||
FrontendOnly: true,
|
||||
Owner: grafanaPluginsPlatformSquad,
|
||||
},
|
||||
{
|
||||
Name: "lokiQueryLimitsContext",
|
||||
Description: "Send X-Loki-Query-Limits-Context header to Loki on first split request",
|
||||
Stage: FeatureStageExperimental,
|
||||
FrontendOnly: true,
|
||||
Owner: grafanaObservabilityLogsSquad,
|
||||
},
|
||||
{
|
||||
Name: "rudderstackUpgrade",
|
||||
Description: "Enables the new version of rudderstack",
|
||||
|
||||
1
pkg/services/featuremgmt/toggles_gen.csv
generated
1
pkg/services/featuremgmt/toggles_gen.csv
generated
@@ -264,4 +264,5 @@ kubernetesAnnotations,experimental,@grafana/grafana-backend-services-squad,false
|
||||
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
|
||||
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
|
||||
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
|
||||
lokiQueryLimitsContext,experimental,@grafana/observability-logs,false,false,true
|
||||
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
|
||||
|
||||
|
6
pkg/services/featuremgmt/toggles_gen.go
generated
6
pkg/services/featuremgmt/toggles_gen.go
generated
@@ -552,7 +552,7 @@ const (
|
||||
FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary"
|
||||
|
||||
// FlagExploreLogsShardSplitting
|
||||
// Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||
// Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
|
||||
FlagExploreLogsShardSplitting = "exploreLogsShardSplitting"
|
||||
|
||||
// FlagExploreLogsAggregatedMetrics
|
||||
@@ -1066,6 +1066,10 @@ const (
|
||||
// Enable TTL plugin instance manager
|
||||
FlagTtlPluginInstanceManager = "ttlPluginInstanceManager"
|
||||
|
||||
// FlagLokiQueryLimitsContext
|
||||
// Send X-Loki-Query-Limits-Context header to Loki on first split request
|
||||
FlagLokiQueryLimitsContext = "lokiQueryLimitsContext"
|
||||
|
||||
// FlagRudderstackUpgrade
|
||||
// Enables the new version of rudderstack
|
||||
FlagRudderstackUpgrade = "rudderstackUpgrade"
|
||||
|
||||
22
pkg/services/featuremgmt/toggles_gen.json
generated
22
pkg/services/featuremgmt/toggles_gen.json
generated
@@ -1574,11 +1574,14 @@
|
||||
{
|
||||
"metadata": {
|
||||
"name": "exploreLogsShardSplitting",
|
||||
"resourceVersion": "1753448760331",
|
||||
"creationTimestamp": "2024-08-29T13:55:59Z"
|
||||
"resourceVersion": "1763611567823",
|
||||
"creationTimestamp": "2024-08-29T13:55:59Z",
|
||||
"annotations": {
|
||||
"grafana.app/updatedTimestamp": "2025-11-20 04:06:07.82367 +0000 UTC"
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"description": "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||
"description": "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/observability-logs",
|
||||
"frontend": true
|
||||
@@ -2636,6 +2639,19 @@
|
||||
"codeowner": "@grafana/observability-logs"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "lokiQueryLimitsContext",
|
||||
"resourceVersion": "1763558434858",
|
||||
"creationTimestamp": "2025-11-19T13:20:34Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Send X-Loki-Query-Limits-Context header to Loki on first split request",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/observability-logs",
|
||||
"frontend": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "lokiQuerySplitting",
|
||||
|
||||
@@ -96,6 +96,8 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
|
||||
return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
|
||||
}
|
||||
|
||||
addQueryLimitsHeader(query, req)
|
||||
|
||||
if query.SupportingQueryType != SupportingQueryNone {
|
||||
value := getSupportingQueryHeaderValue(query.SupportingQueryType)
|
||||
if value != "" {
|
||||
@@ -108,6 +110,15 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func addQueryLimitsHeader(query lokiQuery, req *http.Request) {
|
||||
if len(query.LimitsContext.Expr) > 0 {
|
||||
queryLimitStr, err := json.Marshal(query.LimitsContext)
|
||||
if err == nil {
|
||||
req.Header.Set("X-Loki-Query-Limits-Context", string(queryLimitStr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type lokiResponseError struct {
|
||||
Message string `json:"message"`
|
||||
TraceID string `json:"traceID,omitempty"`
|
||||
|
||||
@@ -2,10 +2,12 @@ package loki
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
|
||||
@@ -47,6 +49,56 @@ func TestApiLogVolume(t *testing.T) {
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("X-Loki-Query-Limits-Context header should be set when LimitsContext is provided", func(t *testing.T) {
|
||||
called := false
|
||||
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
|
||||
to := time.Now().Truncate(time.Millisecond)
|
||||
limitsContext := LimitsContext{
|
||||
Expr: "{cluster=\"us-central1\"}",
|
||||
From: from,
|
||||
To: to,
|
||||
}
|
||||
|
||||
limitsContextJson, _ := json.Marshal(limitsContext)
|
||||
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||
called = true
|
||||
require.Equal(t, string(limitsContextJson), req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||
})
|
||||
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
|
||||
require.NoError(t, err)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is missing expr", func(t *testing.T) {
|
||||
called := false
|
||||
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
|
||||
to := time.Now().Truncate(time.Millisecond)
|
||||
limitsContext := LimitsContext{
|
||||
Expr: "",
|
||||
From: from,
|
||||
To: to,
|
||||
}
|
||||
|
||||
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||
called = true
|
||||
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||
})
|
||||
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
|
||||
require.NoError(t, err)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is not provided", func(t *testing.T) {
|
||||
called := false
|
||||
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||
called = true
|
||||
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
|
||||
})
|
||||
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||
require.NoError(t, err)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("data sample queries should set data sample http header", func(t *testing.T) {
|
||||
called := false
|
||||
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
|
||||
|
||||
13
pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
generated
13
pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
generated
@@ -18,6 +18,17 @@ const (
|
||||
QueryEditorModeBuilder QueryEditorMode = "builder"
|
||||
)
|
||||
|
||||
type LimitsContext struct {
|
||||
Expr string `json:"expr"`
|
||||
From int64 `json:"from"`
|
||||
To int64 `json:"to"`
|
||||
}
|
||||
|
||||
// NewLimitsContext creates a new LimitsContext object.
|
||||
func NewLimitsContext() *LimitsContext {
|
||||
return &LimitsContext{}
|
||||
}
|
||||
|
||||
type LokiQueryType string
|
||||
|
||||
const (
|
||||
@@ -59,6 +70,8 @@ type LokiDataQuery struct {
|
||||
Instant *bool `json:"instant,omitempty"`
|
||||
// Used to set step value for range queries.
|
||||
Step *string `json:"step,omitempty"`
|
||||
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||
LimitsContext *LimitsContext `json:"limitsContext,omitempty"`
|
||||
// A unique identifier for the query within the list of targets.
|
||||
// In server side expressions, the refId is used as a variable name to identify results.
|
||||
// By default, the UI will assign A->Z; however setting meaningful names may be useful.
|
||||
|
||||
@@ -156,6 +156,8 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
|
||||
|
||||
expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step)
|
||||
|
||||
limitsConfig := generateLimitsConfig(model, interval, timeRange, queryType, step)
|
||||
|
||||
direction, err := parseDirection(model.Direction)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -192,8 +194,21 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
|
||||
RefID: query.RefID,
|
||||
SupportingQueryType: supportingQueryType,
|
||||
Scopes: model.Scopes,
|
||||
LimitsContext: limitsConfig,
|
||||
})
|
||||
}
|
||||
|
||||
return qs, nil
|
||||
}
|
||||
|
||||
func generateLimitsConfig(model *QueryJSONModel, interval time.Duration, timeRange time.Duration, queryType QueryType, step time.Duration) LimitsContext {
|
||||
var limitsConfig LimitsContext
|
||||
// Only supply limits context config if we have expression, and from and to
|
||||
if model.LimitsContext != nil && model.LimitsContext.Expr != "" && model.LimitsContext.From > 0 && model.LimitsContext.To > 0 {
|
||||
// If a limits expression was provided, interpolate it and parse the time range
|
||||
limitsConfig.Expr = interpolateVariables(model.LimitsContext.Expr, interval, timeRange, queryType, step)
|
||||
limitsConfig.From = time.UnixMilli(model.LimitsContext.From)
|
||||
limitsConfig.To = time.UnixMilli(model.LimitsContext.To)
|
||||
}
|
||||
return limitsConfig
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package loki
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -145,6 +146,74 @@ func TestParseQuery(t *testing.T) {
|
||||
require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr)
|
||||
})
|
||||
|
||||
t.Run("parsing query model with invalid query limits context expr", func(t *testing.T) {
|
||||
from := time.Now().Add(-3000 * time.Second)
|
||||
fullFrom := time.Now().Add(-1 * time.Hour)
|
||||
to := time.Now()
|
||||
|
||||
queryContext := &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
JSON: []byte(`
|
||||
{
|
||||
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
|
||||
"format": "time_series",
|
||||
"refId": "A",
|
||||
"limitsContext": {"expr": "", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
|
||||
}`,
|
||||
),
|
||||
TimeRange: backend.TimeRange{
|
||||
From: from,
|
||||
To: to,
|
||||
},
|
||||
Interval: time.Second * 15,
|
||||
MaxDataPoints: 200,
|
||||
},
|
||||
},
|
||||
}
|
||||
models, err := parseQuery(queryContext, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
|
||||
// If the limits context expression is missing, we don't set any limits context
|
||||
require.Equal(t, ``, models[0].LimitsContext.Expr)
|
||||
require.Equal(t, time.Time{}, models[0].LimitsContext.To)
|
||||
require.Equal(t, time.Time{}, models[0].LimitsContext.From)
|
||||
})
|
||||
|
||||
t.Run("parsing query model with query limits context", func(t *testing.T) {
|
||||
from := time.Now().Add(-3000 * time.Second)
|
||||
fullFrom := time.Now().Add(-1 * time.Hour)
|
||||
to := time.Now()
|
||||
|
||||
queryContext := &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
JSON: []byte(`
|
||||
{
|
||||
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
|
||||
"format": "time_series",
|
||||
"refId": "A",
|
||||
"limitsContext": {"expr": "count_over_time({service_name=\"apache\"}[$__auto])", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
|
||||
}`,
|
||||
),
|
||||
TimeRange: backend.TimeRange{
|
||||
From: from,
|
||||
To: to,
|
||||
},
|
||||
Interval: time.Second * 15,
|
||||
MaxDataPoints: 200,
|
||||
},
|
||||
},
|
||||
}
|
||||
models, err := parseQuery(queryContext, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, time.Second*15, models[0].Step)
|
||||
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
|
||||
require.Equal(t, `count_over_time({service_name="apache"}[15s])`, models[0].LimitsContext.Expr)
|
||||
require.Equal(t, to.Truncate(time.Millisecond), models[0].LimitsContext.To)
|
||||
require.Equal(t, fullFrom.Truncate(time.Millisecond), models[0].LimitsContext.From)
|
||||
})
|
||||
|
||||
t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) {
|
||||
expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms"
|
||||
queryType := dataquery.LokiQueryTypeRange
|
||||
|
||||
@@ -11,6 +11,11 @@ import (
|
||||
type QueryType = dataquery.LokiQueryType
|
||||
type SupportingQueryType = dataquery.SupportingQueryType
|
||||
type Direction = dataquery.LokiQueryDirection
|
||||
type LimitsContext struct {
|
||||
Expr string
|
||||
From time.Time
|
||||
To time.Time
|
||||
}
|
||||
|
||||
const (
|
||||
QueryTypeRange = dataquery.LokiQueryTypeRange
|
||||
@@ -42,4 +47,5 @@ type lokiQuery struct {
|
||||
RefID string
|
||||
SupportingQueryType SupportingQueryType
|
||||
Scopes []scope.ScopeFilter
|
||||
LimitsContext LimitsContext
|
||||
}
|
||||
|
||||
@@ -795,6 +795,7 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
|
||||
<PanelChrome
|
||||
title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')}
|
||||
collapsible
|
||||
loadingState={logsVolumeData?.state}
|
||||
collapsed={!logsVolumeEnabled}
|
||||
onToggleCollapse={onToggleLogsVolumeCollapse}
|
||||
>
|
||||
|
||||
@@ -26,7 +26,7 @@ import { mergeLogsVolumeDataFrames, isLogsVolumeLimited, getLogsVolumeMaximumRan
|
||||
import { SupplementaryResultError } from '../SupplementaryResultError';
|
||||
|
||||
import { LogsVolumePanel } from './LogsVolumePanel';
|
||||
import { isTimeoutErrorResponse } from './utils/logsVolumeResponse';
|
||||
import { isClientErrorResponse } from './utils/logsVolumeResponse';
|
||||
|
||||
type Props = {
|
||||
logsVolumeData: DataQueryResponse | undefined;
|
||||
@@ -92,7 +92,7 @@ export const LogsVolumePanelList = ({
|
||||
|
||||
const canShowPartialData =
|
||||
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
|
||||
const timeoutError = isTimeoutErrorResponse(logsVolumeData);
|
||||
const clientError = isClientErrorResponse(logsVolumeData);
|
||||
|
||||
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
|
||||
const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to));
|
||||
@@ -123,7 +123,7 @@ export const LogsVolumePanelList = ({
|
||||
<Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans>
|
||||
</span>
|
||||
);
|
||||
} else if (timeoutError && !canShowPartialData) {
|
||||
} else if (clientError && !canShowPartialData) {
|
||||
return (
|
||||
<SupplementaryResultError
|
||||
title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')}
|
||||
@@ -184,7 +184,7 @@ export const LogsVolumePanelList = ({
|
||||
|
||||
return (
|
||||
<div className={styles.listContainer}>
|
||||
{timeoutError && canShowPartialData && (
|
||||
{clientError && canShowPartialData && (
|
||||
<SupplementaryResultError
|
||||
title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')}
|
||||
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { DataQueryResponse } from '@grafana/data';
|
||||
|
||||
import { isTimeoutErrorResponse } from './logsVolumeResponse';
|
||||
import { isClientErrorResponse } from './logsVolumeResponse';
|
||||
|
||||
const errorA =
|
||||
'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)';
|
||||
@@ -16,7 +16,7 @@ describe('isTimeoutErrorResponse', () => {
|
||||
message: timeoutError,
|
||||
},
|
||||
};
|
||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
||||
expect(isClientErrorResponse(response)).toBe(true);
|
||||
}
|
||||
);
|
||||
test.each([errorA, errorB])(
|
||||
@@ -33,7 +33,7 @@ describe('isTimeoutErrorResponse', () => {
|
||||
},
|
||||
],
|
||||
};
|
||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
||||
expect(isClientErrorResponse(response)).toBe(true);
|
||||
}
|
||||
);
|
||||
test.each([errorA, errorB])(
|
||||
@@ -54,13 +54,13 @@ describe('isTimeoutErrorResponse', () => {
|
||||
},
|
||||
],
|
||||
};
|
||||
expect(isTimeoutErrorResponse(response)).toBe(true);
|
||||
expect(isClientErrorResponse(response)).toBe(true);
|
||||
}
|
||||
);
|
||||
test('does not report false positives', () => {
|
||||
const response: DataQueryResponse = {
|
||||
data: [],
|
||||
};
|
||||
expect(isTimeoutErrorResponse(response)).toBe(false);
|
||||
expect(isClientErrorResponse(response)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { DataQueryError, DataQueryResponse } from '@grafana/data';
|
||||
import { is4xxError } from '@grafana-plugins/loki/responseUtils';
|
||||
|
||||
// Currently we can only infer if an error response is a timeout or not.
|
||||
export function isTimeoutErrorResponse(response: DataQueryResponse | undefined): boolean {
|
||||
export function isClientErrorResponse(response: DataQueryResponse | undefined): boolean {
|
||||
if (!response) {
|
||||
return false;
|
||||
}
|
||||
@@ -13,6 +14,8 @@ export function isTimeoutErrorResponse(response: DataQueryResponse | undefined):
|
||||
|
||||
return errors.some((error: DataQueryError) => {
|
||||
const message = `${error.message || error.data?.message}`?.toLowerCase();
|
||||
return message.includes('timeout');
|
||||
return (
|
||||
message.includes('timeout') || message?.includes('the query would read too many bytes') || is4xxError(response)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -42,6 +42,14 @@ composableKinds: DataQuery: {
|
||||
instant?: bool
|
||||
// Used to set step value for range queries.
|
||||
step?: string
|
||||
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||
limitsContext?: #LimitsContext
|
||||
|
||||
#LimitsContext: {
|
||||
expr: string
|
||||
from: int64
|
||||
to: int64
|
||||
}
|
||||
|
||||
#QueryEditorMode: "code" | "builder" @cuetsy(kind="enum")
|
||||
|
||||
|
||||
@@ -48,6 +48,14 @@ export interface LokiDataQuery extends common.DataQuery {
|
||||
* Used to override the name of the series.
|
||||
*/
|
||||
legendFormat?: string;
|
||||
/**
|
||||
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
|
||||
*/
|
||||
limitsContext?: {
|
||||
expr: string;
|
||||
from: number;
|
||||
to: number;
|
||||
};
|
||||
/**
|
||||
* Used to limit the number of log rows returned.
|
||||
*/
|
||||
|
||||
@@ -36,6 +36,11 @@ function getFrameKey(frame: DataFrame): string | undefined {
|
||||
return frame.refId ?? frame.name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @todo test new response is error, current response is not
|
||||
* @param currentResponse
|
||||
* @param newResponse
|
||||
*/
|
||||
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
|
||||
if (!currentResponse) {
|
||||
return cloneQueryResponse(newResponse);
|
||||
@@ -65,6 +70,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
|
||||
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
|
||||
if (mergedErrors.length > 0) {
|
||||
currentResponse.errors = mergedErrors;
|
||||
currentResponse.state = LoadingState.Error;
|
||||
}
|
||||
|
||||
// the `.error` attribute is obsolete now,
|
||||
@@ -75,6 +81,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
|
||||
const mergedError = currentResponse.error ?? newResponse.error;
|
||||
if (mergedError != null) {
|
||||
currentResponse.error = mergedError;
|
||||
currentResponse.state = LoadingState.Error;
|
||||
}
|
||||
|
||||
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
|
||||
|
||||
@@ -222,6 +222,33 @@ export function getMockFrames() {
|
||||
length: 2,
|
||||
};
|
||||
|
||||
const metricFrameAB: DataFrame = {
|
||||
refId: 'A',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: [1000000, 2000000, 3000000, 4000000],
|
||||
},
|
||||
{
|
||||
name: 'Value',
|
||||
type: FieldType.number,
|
||||
config: {},
|
||||
values: [6, 7, 5, 4],
|
||||
labels: {
|
||||
level: 'debug',
|
||||
},
|
||||
},
|
||||
],
|
||||
meta: {
|
||||
notices: [],
|
||||
type: DataFrameType.TimeSeriesMulti,
|
||||
stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }],
|
||||
},
|
||||
length: 4,
|
||||
};
|
||||
|
||||
const metricFrameC: DataFrame = {
|
||||
refId: 'A',
|
||||
name: 'some-time-series',
|
||||
@@ -305,6 +332,7 @@ export function getMockFrames() {
|
||||
metricFrameA,
|
||||
metricFrameB,
|
||||
metricFrameC,
|
||||
metricFrameAB,
|
||||
emptyFrame,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { of } from 'rxjs';
|
||||
|
||||
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data';
|
||||
import { DataQueryError, DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
|
||||
import { config } from '@grafana/runtime';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
@@ -16,6 +16,7 @@ jest.mock('uuid', () => ({
|
||||
}));
|
||||
|
||||
const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
|
||||
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
|
||||
const originalErr = console.error;
|
||||
beforeEach(() => {
|
||||
jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||
@@ -26,22 +27,23 @@ beforeAll(() => {
|
||||
callback();
|
||||
});
|
||||
config.featureToggles.lokiShardSplitting = false;
|
||||
config.featureToggles.lokiQueryLimitsContext = true;
|
||||
});
|
||||
afterAll(() => {
|
||||
jest.mocked(global.setTimeout).mockReset();
|
||||
config.featureToggles.lokiShardSplitting = originalShardingFlagState;
|
||||
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
|
||||
console.error = originalErr;
|
||||
});
|
||||
|
||||
describe('runSplitQuery()', () => {
|
||||
let datasource: LokiDatasource;
|
||||
const from = dateTime('2023-02-08T05:00:00.000Z');
|
||||
const to = dateTime('2023-02-10T06:00:00.000Z');
|
||||
const range = {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
raw: {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
},
|
||||
from,
|
||||
to,
|
||||
raw: { from, to },
|
||||
};
|
||||
|
||||
const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
|
||||
@@ -165,6 +167,19 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1676008800000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: undefined,
|
||||
limitsContext: {
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
from: from.valueOf(),
|
||||
to: to.valueOf(),
|
||||
},
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
@@ -183,6 +198,15 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1676005140000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: undefined,
|
||||
limitsContext: undefined,
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
@@ -201,6 +225,15 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1675918740000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: undefined,
|
||||
limitsContext: undefined,
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
});
|
||||
@@ -225,6 +258,19 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1676008800000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: '10s',
|
||||
limitsContext: {
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
from: from.valueOf(),
|
||||
to: to.valueOf(),
|
||||
},
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
@@ -243,6 +289,15 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1676005190000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: '10s',
|
||||
limitsContext: undefined,
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
@@ -261,21 +316,127 @@ describe('runSplitQuery()', () => {
|
||||
_i: 1675918790000,
|
||||
}),
|
||||
}),
|
||||
targets: [
|
||||
{
|
||||
expr: 'count_over_time({a="b"}[1m])',
|
||||
legendFormat: undefined,
|
||||
refId: 'A',
|
||||
step: '10s',
|
||||
limitsContext: undefined,
|
||||
},
|
||||
],
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
test('Handles and reports errors', async () => {
|
||||
test('Retries 5xx errors', async () => {
|
||||
const { metricFrameA, metricFrameB, metricFrameAB } = getMockFrames();
|
||||
const error: DataQueryError = {
|
||||
message: 'OOPSIE',
|
||||
status: 518,
|
||||
};
|
||||
const errResponse: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
data: [],
|
||||
errors: [error],
|
||||
key: 'uuid',
|
||||
};
|
||||
const response: DataQueryResponse = {
|
||||
state: LoadingState.Done,
|
||||
data: [metricFrameA],
|
||||
key: 'uuid',
|
||||
};
|
||||
const response2: DataQueryResponse = {
|
||||
state: LoadingState.Done,
|
||||
data: [metricFrameB],
|
||||
key: 'uuid',
|
||||
};
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
|
||||
.mockReturnValueOnce(of(errResponse))
|
||||
.mockReturnValueOnce(of(response))
|
||||
.mockReturnValueOnce(of(response2));
|
||||
|
||||
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||
expect(values).toHaveLength(4);
|
||||
expect(values[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
data: [metricFrameAB],
|
||||
key: 'uuid',
|
||||
state: LoadingState.Done,
|
||||
})
|
||||
);
|
||||
});
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
test('Handles and reports 5xx error too many bytes', async () => {
|
||||
const error: DataQueryError = {
|
||||
message: 'the query would read too many bytes ...',
|
||||
status: 500,
|
||||
};
|
||||
const response: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
data: [],
|
||||
errors: [error],
|
||||
};
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
|
||||
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||
expect(values).toHaveLength(1);
|
||||
expect(values[0]).toEqual(
|
||||
expect.objectContaining({ error: { refId: 'A', message: 'Error' }, state: LoadingState.Streaming })
|
||||
expect.objectContaining({
|
||||
errors: [error],
|
||||
state: LoadingState.Error,
|
||||
})
|
||||
);
|
||||
});
|
||||
// Errors are not retried
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
test('Handles and reports 4xx errors', async () => {
|
||||
const error: DataQueryError = {
|
||||
message: 'BAD REQUEST',
|
||||
status: 418,
|
||||
};
|
||||
const response: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
data: [],
|
||||
errors: [error],
|
||||
};
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
|
||||
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||
expect(values).toHaveLength(1);
|
||||
expect(values[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
errors: [error],
|
||||
state: LoadingState.Error,
|
||||
})
|
||||
);
|
||||
});
|
||||
// Errors are not retried
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('Handles and reports errors (deprecated error)', async () => {
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(
|
||||
of({
|
||||
state: LoadingState.Error,
|
||||
error: { refId: 'A', message: 'the query would read too many bytes ...' },
|
||||
data: [],
|
||||
key: 'uuid',
|
||||
})
|
||||
);
|
||||
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
|
||||
expect(values).toHaveLength(1);
|
||||
expect(values[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
error: { refId: 'A', message: 'the query would read too many bytes ...' },
|
||||
state: LoadingState.Error,
|
||||
})
|
||||
);
|
||||
});
|
||||
// Errors are not retried
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
describe('Hidden and empty queries', () => {
|
||||
|
||||
@@ -8,16 +8,18 @@ import {
|
||||
DataQueryResponse,
|
||||
DataTopic,
|
||||
dateTime,
|
||||
rangeUtil,
|
||||
TimeRange,
|
||||
LoadingState,
|
||||
rangeUtil,
|
||||
store,
|
||||
TimeRange,
|
||||
} from '@grafana/data';
|
||||
import { config } from '@grafana/runtime';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
|
||||
import { combineResponses } from './mergeResponses';
|
||||
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
|
||||
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
|
||||
import { addQueryLimitsContext, isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
|
||||
import { isRetriableError } from './responseUtils';
|
||||
import { trackGroupedQueries } from './tracking';
|
||||
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
|
||||
@@ -55,6 +57,10 @@ interface QuerySplittingOptions {
|
||||
* Do not retry failed queries.
|
||||
*/
|
||||
disableRetry?: boolean;
|
||||
/**
|
||||
* The current index of all query attempts
|
||||
*/
|
||||
shardQueryIndex?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -85,6 +91,25 @@ export function adjustTargetsFromResponseState(targets: LokiQuery[], response: D
|
||||
})
|
||||
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
|
||||
}
|
||||
|
||||
const addLimitsToSplitRequests = (splitQueryIndex: number, shardQueryIndex: number, requests: LokiGroupedRequest[]) => {
|
||||
// requests has already been mutated
|
||||
return requests.map((r) => ({
|
||||
...r,
|
||||
request: {
|
||||
...r.request,
|
||||
targets: r.request.targets.map((t) => {
|
||||
// @todo if we retry the first request, we will strip out the query limits context
|
||||
if (splitQueryIndex === 0 && shardQueryIndex === 0) {
|
||||
// Don't pull from request if it has already been added by `addLimitsToShardGroups`
|
||||
return t.limitsContext === undefined ? addQueryLimitsContext(t, r.request) : t;
|
||||
}
|
||||
return { ...t, limitsContext: undefined };
|
||||
}),
|
||||
},
|
||||
}));
|
||||
};
|
||||
|
||||
export function runSplitGroupedQueries(
|
||||
datasource: LokiDatasource,
|
||||
requests: LokiGroupedRequest[],
|
||||
@@ -99,8 +124,15 @@ export function runSplitGroupedQueries(
|
||||
let subquerySubscription: Subscription | null = null;
|
||||
let retriesMap = new Map<string, number>();
|
||||
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let splitQueryIndex = 0;
|
||||
const shardQueryIndex = options.shardQueryIndex ?? 0;
|
||||
|
||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
|
||||
if (config.featureToggles.lokiQueryLimitsContext) {
|
||||
requests = addLimitsToSplitRequests(splitQueryIndex, shardQueryIndex, requests);
|
||||
}
|
||||
|
||||
splitQueryIndex++;
|
||||
let retrying = false;
|
||||
|
||||
if (subquerySubscription != null) {
|
||||
@@ -114,7 +146,10 @@ export function runSplitGroupedQueries(
|
||||
}
|
||||
|
||||
const done = () => {
|
||||
mergedResponse.state = LoadingState.Done;
|
||||
if (mergedResponse.state !== LoadingState.Error) {
|
||||
mergedResponse.state = LoadingState.Done;
|
||||
}
|
||||
|
||||
subscriber.next(mergedResponse);
|
||||
subscriber.complete();
|
||||
};
|
||||
@@ -189,6 +224,10 @@ export function runSplitGroupedQueries(
|
||||
if (!options.skipPartialUpdates) {
|
||||
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
|
||||
}
|
||||
|
||||
if (mergedResponse.state === LoadingState.Error) {
|
||||
done();
|
||||
}
|
||||
},
|
||||
complete: () => {
|
||||
if (retrying) {
|
||||
@@ -301,7 +340,9 @@ export function runSplitQuery(
|
||||
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
|
||||
|
||||
request.queryGroupId = uuidv4();
|
||||
const oneDayMs = 24 * 60 * 60 * 1000;
|
||||
// Allow custom split durations for debugging, e.g. `localStorage.setItem('grafana.loki.querySplitInterval', 24 * 60 * 1000) // 1 hour`
|
||||
const debugSplitDuration = parseInt(store.get('grafana.loki.querySplitInterval'), 10);
|
||||
const oneDayMs = debugSplitDuration || 24 * 60 * 60 * 1000;
|
||||
const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
|
||||
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
|
||||
);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { SyntaxNode } from '@lezer/common';
|
||||
import { escapeRegExp } from 'lodash';
|
||||
|
||||
import { DataQueryRequest } from '@grafana/data';
|
||||
import {
|
||||
parser,
|
||||
LineFilter,
|
||||
@@ -310,6 +311,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
|
||||
export function requestSupportsSplitting(allQueries: LokiQuery[]) {
|
||||
const queries = allQueries
|
||||
.filter((query) => !query.hide)
|
||||
.filter((query) => query.queryType !== LokiQueryType.Instant)
|
||||
.filter((query) => !query.refId.includes('do-not-chunk'))
|
||||
.filter((query) => query.expr);
|
||||
|
||||
@@ -425,3 +427,21 @@ export const getSelectorForShardValues = (query: string) => {
|
||||
}
|
||||
return '';
|
||||
};
|
||||
|
||||
/**
|
||||
* Adds query plan to shard/split queries
|
||||
* Must be called after interpolation step!
|
||||
*
|
||||
* @param lokiQuery
|
||||
* @param request
|
||||
*/
|
||||
export const addQueryLimitsContext = (lokiQuery: LokiQuery, request: DataQueryRequest<LokiQuery>) => {
|
||||
return {
|
||||
...lokiQuery,
|
||||
limitsContext: {
|
||||
expr: lokiQuery.expr,
|
||||
from: request.range.from.toDate().getTime(),
|
||||
to: request.range.to.toDate().getTime(),
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
|
||||
import { DataFrame, DataQueryError, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
|
||||
|
||||
import { isBytesString, processLabels } from './languageUtils';
|
||||
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
|
||||
@@ -134,13 +134,44 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
|
||||
|
||||
export function isRetriableError(errorResponse: DataQueryResponse) {
|
||||
const message = errorResponse.errors
|
||||
? (errorResponse.errors[0].message ?? '').toLowerCase()
|
||||
: (errorResponse.error?.message ?? '');
|
||||
? errorResponse.errors
|
||||
.map((err) => err.message ?? '')
|
||||
.join()
|
||||
.toLowerCase()
|
||||
: (errorResponse.error?.message ?? '').toLowerCase();
|
||||
|
||||
// max_query_bytes_read exceeded, currently 500 when should be 4xx
|
||||
if (message.includes('the query would read too many bytes') || is4xxError(errorResponse)) {
|
||||
throw new Error(message);
|
||||
}
|
||||
if (message.includes('timeout')) {
|
||||
return true;
|
||||
} else if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
|
||||
}
|
||||
if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
|
||||
// Error response but we're receiving data, continue querying.
|
||||
return false;
|
||||
}
|
||||
if (is5xxError(errorResponse)) {
|
||||
return true;
|
||||
}
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
export function is4xxError(errorResponse: DataQueryResponse) {
|
||||
/**
|
||||
* Before https://github.com/grafana/grafana/pull/114201 the Loki data source always returns a 500 for every error response type in the response body, and this is what Grafana uses to populate the DataQueryError
|
||||
* Since the frontend and backend are being deployed separately now we might want to continue to check error messages for a bit until we are sure that the correct status code is always set in the data query response.
|
||||
*
|
||||
* @param errorResponse
|
||||
*/
|
||||
return isHttpErrorType(errorResponse, '4');
|
||||
}
|
||||
|
||||
export function is5xxError(errorResponse: DataQueryResponse) {
|
||||
return isHttpErrorType(errorResponse, '5');
|
||||
}
|
||||
|
||||
function isHttpErrorType(errorResponse: DataQueryResponse, responseType: '2' | '3' | '4' | '5') {
|
||||
const isErrOfType = (err: DataQueryError) => err.status && Array.from(err.status?.toString())[0] === responseType;
|
||||
return (errorResponse.error && isErrOfType(errorResponse.error)) || errorResponse.errors?.some(isErrOfType);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { of } from 'rxjs';
|
||||
|
||||
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
|
||||
import { config } from '@grafana/runtime';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { createLokiDatasource } from './mocks/datasource';
|
||||
@@ -12,6 +13,8 @@ jest.mock('uuid', () => ({
|
||||
v4: jest.fn().mockReturnValue('uuid'),
|
||||
}));
|
||||
|
||||
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
|
||||
|
||||
const originalLog = console.log;
|
||||
const originalWarn = console.warn;
|
||||
const originalErr = console.error;
|
||||
@@ -20,20 +23,26 @@ beforeEach(() => {
|
||||
jest.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
jest.spyOn(console, 'error').mockImplementation(() => {});
|
||||
});
|
||||
beforeAll(() => {
|
||||
config.featureToggles.lokiQueryLimitsContext = true;
|
||||
});
|
||||
afterAll(() => {
|
||||
console.log = originalLog;
|
||||
console.warn = originalWarn;
|
||||
console.error = originalErr;
|
||||
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
|
||||
});
|
||||
|
||||
describe('runShardSplitQuery()', () => {
|
||||
let datasource: LokiDatasource;
|
||||
const from = dateTime('2023-02-08T04:00:00.000Z');
|
||||
const to = dateTime('2023-02-08T11:00:00.000Z');
|
||||
const range = {
|
||||
from: dateTime('2023-02-08T04:00:00.000Z'),
|
||||
to: dateTime('2023-02-08T11:00:00.000Z'),
|
||||
from,
|
||||
to,
|
||||
raw: {
|
||||
from: dateTime('2023-02-08T04:00:00.000Z'),
|
||||
to: dateTime('2023-02-08T11:00:00.000Z'),
|
||||
from,
|
||||
to,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -139,6 +148,11 @@ describe('runShardSplitQuery()', () => {
|
||||
targets: [
|
||||
{
|
||||
expr: '{a="b", __stream_shard__=~"20|10"}',
|
||||
limitsContext: {
|
||||
expr: `{a="b"}`,
|
||||
from: from.valueOf(),
|
||||
to: to.valueOf(),
|
||||
},
|
||||
refId: 'A',
|
||||
direction: LokiQueryDirection.Scan,
|
||||
},
|
||||
@@ -209,6 +223,11 @@ describe('runShardSplitQuery()', () => {
|
||||
targets: [
|
||||
{
|
||||
expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}',
|
||||
limitsContext: {
|
||||
expr: `{service_name="test", filter="true"}`,
|
||||
from: from.valueOf(),
|
||||
to: to.valueOf(),
|
||||
},
|
||||
refId: 'A',
|
||||
direction: LokiQueryDirection.Scan,
|
||||
},
|
||||
@@ -241,28 +260,113 @@ describe('runShardSplitQuery()', () => {
|
||||
});
|
||||
});
|
||||
|
||||
test('Failed requests have loading state Error', async () => {
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] }));
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(response[0].state).toBe(LoadingState.Error);
|
||||
describe('Errors', () => {
|
||||
beforeEach(() => {
|
||||
const querySplittingRange = {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
raw: {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
},
|
||||
};
|
||||
request = createRequest([{ expr: '$SELECTOR', refId: 'A', direction: LokiQueryDirection.Scan }], {
|
||||
range: querySplittingRange,
|
||||
});
|
||||
// @ts-expect-error
|
||||
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
||||
callback();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('Does not retry on other errors', async () => {
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
|
||||
// @ts-expect-error
|
||||
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
||||
callback();
|
||||
});
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
test('Failed 4xx responses have loading state Error', async () => {
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '12', '5']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValue(
|
||||
of({ state: LoadingState.Error, error: { refId: 'A', message: 'client error', status: 400 }, data: [] })
|
||||
);
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(response[0].state).toBe(LoadingState.Error);
|
||||
});
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('Max query bytes errors are not retried', async () => {
|
||||
const errResp: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
errors: [{ refId: 'A', message: 'the query would read too many bytes ...', status: 500 }],
|
||||
data: [],
|
||||
};
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValueOnce(of(errResp))
|
||||
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(response[0].state).toBe(LoadingState.Error);
|
||||
});
|
||||
|
||||
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('Failed 5xx requests are retried', async () => {
|
||||
const errResp: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
errors: [{ refId: 'A', message: 'parse error', status: 500 }],
|
||||
data: [],
|
||||
};
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValueOnce(of(errResp))
|
||||
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(response[0].state).toBe(LoadingState.Done);
|
||||
});
|
||||
|
||||
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
|
||||
});
|
||||
|
||||
test('Failed 5xx requests are retried (dep)', async () => {
|
||||
const errResp: DataQueryResponse = {
|
||||
state: LoadingState.Error,
|
||||
error: { refId: 'A', message: 'parse error', status: 500 },
|
||||
data: [],
|
||||
};
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValueOnce(of(errResp))
|
||||
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
|
||||
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(response[0].state).toBe(LoadingState.Done);
|
||||
});
|
||||
|
||||
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
|
||||
});
|
||||
|
||||
test('Does not retry on other errors', async () => {
|
||||
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValueOnce(
|
||||
of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })
|
||||
);
|
||||
// @ts-expect-error
|
||||
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
|
||||
callback();
|
||||
});
|
||||
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test('Adjusts the group size based on errors and execution time', async () => {
|
||||
@@ -425,6 +529,11 @@ describe('runShardSplitQuery()', () => {
|
||||
targets: [
|
||||
{
|
||||
expr: '{a="b", __stream_shard__=~"20|10|9"}',
|
||||
limitsContext: {
|
||||
expr: `{a="b"}`,
|
||||
from: from.valueOf(),
|
||||
to: to.valueOf(),
|
||||
},
|
||||
refId: 'A',
|
||||
direction: LokiQueryDirection.Scan,
|
||||
},
|
||||
|
||||
@@ -2,15 +2,20 @@ import { groupBy, partition } from 'lodash';
|
||||
import { Observable, Subscriber, Subscription } from 'rxjs';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
|
||||
import { DataQueryRequest, DataQueryResponse, LoadingState, QueryResultMetaStat } from '@grafana/data';
|
||||
import { config } from '@grafana/runtime';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { combineResponses, replaceResponses } from './mergeResponses';
|
||||
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
|
||||
import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils';
|
||||
import {
|
||||
addQueryLimitsContext,
|
||||
getSelectorForShardValues,
|
||||
interpolateShardingSelector,
|
||||
requestSupportsSharding,
|
||||
} from './queryUtils';
|
||||
import { isRetriableError } from './responseUtils';
|
||||
import { LokiQuery } from './types';
|
||||
|
||||
/**
|
||||
* Query splitting by stream shards.
|
||||
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
|
||||
@@ -54,6 +59,19 @@ export function runShardSplitQuery(datasource: LokiDatasource, request: DataQuer
|
||||
return splitQueriesByStreamShard(datasource, request, queries);
|
||||
}
|
||||
|
||||
const addLimitsToShardGroups = (
|
||||
queryIndex: number,
|
||||
groups: ShardedQueryGroup[],
|
||||
request: DataQueryRequest<LokiQuery>
|
||||
) => {
|
||||
return groups.map((g) => ({
|
||||
...g,
|
||||
targets: g.targets.map((t) => {
|
||||
return queryIndex === 0 ? addQueryLimitsContext(t, request) : { ...t, limitsContext: undefined };
|
||||
}),
|
||||
}));
|
||||
};
|
||||
|
||||
function splitQueriesByStreamShard(
|
||||
datasource: LokiDatasource,
|
||||
request: DataQueryRequest<LokiQuery>,
|
||||
@@ -64,8 +82,13 @@ function splitQueriesByStreamShard(
|
||||
let subquerySubscription: Subscription | null = null;
|
||||
let retriesMap = new Map<string, number>();
|
||||
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let queryIndex = 0;
|
||||
|
||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
|
||||
if (config.featureToggles.lokiQueryLimitsContext) {
|
||||
groups = addLimitsToShardGroups(queryIndex, groups, request);
|
||||
}
|
||||
queryIndex++;
|
||||
let nextGroupSize = groups[group].groupSize;
|
||||
const { shards, groupSize, cycle } = groups[group];
|
||||
let retrying = false;
|
||||
@@ -164,6 +187,7 @@ function splitQueriesByStreamShard(
|
||||
subquerySubscription = runSplitQuery(datasource, subRequest, {
|
||||
skipPartialUpdates: true,
|
||||
disableRetry: true,
|
||||
shardQueryIndex: queryIndex - 1,
|
||||
}).subscribe({
|
||||
next: (partialResponse: DataQueryResponse) => {
|
||||
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
|
||||
|
||||
Reference in New Issue
Block a user