Compare commits

...

35 Commits

Author SHA1 Message Date
Galen
49cace9952 Revert "chore: revert generic error handling"
This reverts commit effb9a1f9a.
2025-11-20 07:04:23 -06:00
Galen
effb9a1f9a chore: revert generic error handling 2025-11-20 07:03:59 -06:00
Galen
dc6dcc5a78 chore: mark old flag as deprecated 2025-11-19 22:06:22 -06:00
Galen
e2e30ba8dc chore: fix type error 2025-11-19 22:02:03 -06:00
Galen
c644838884 test: error coverage 2025-11-19 21:41:43 -06:00
Galen
c5b49ccab6 test: sharding 5xx coverage 2025-11-19 21:20:49 -06:00
Galen
42b07f638d chore: test coverage 2025-11-19 20:51:12 -06:00
Galen
91f04e8788 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 16:54:43 -06:00
Galen
3207a4c37a chore: update doc 2025-11-19 16:19:16 -06:00
Galen
2c95fae36b fix: query sharding not throwing panel errors for n+1 reqs 2025-11-19 15:40:56 -06:00
Galen
ef11f2b633 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 11:24:02 -06:00
Galen
af00322124 chore: clean up generateLimitsConfig 2025-11-19 10:39:02 -06:00
Galen
f1a8041666 chore: allow time splitting debug duration 2025-11-19 09:57:38 -06:00
Galen
1d631f3537 chore: clean up parse_query code 2025-11-19 09:57:00 -06:00
Galen
2f88b7dd8b chore: remove unused type 2025-11-19 09:34:46 -06:00
Galen
e57411e4d3 chore: add cue ref 2025-11-19 09:28:15 -06:00
Galen
db30a16862 chore: clean up 2025-11-19 09:18:34 -06:00
Galen
9bb9d1f420 chore: actually fix comment 2025-11-19 07:56:16 -06:00
Galen
10cdb94062 chore: clean up comment 2025-11-19 07:54:30 -06:00
Galen
293ffafa01 chore: clean up docblock 2025-11-19 07:51:29 -06:00
Galen
c1e7bcede5 chore: clean up, fix tests 2025-11-19 07:49:15 -06:00
Galen
48204060ef chore: clean up todo 2025-11-19 07:44:57 -06:00
Galen
5b8d8d6698 chore: clean up parse_query 2025-11-19 07:43:44 -06:00
Galen
bf2e29d106 chore: add lokiQueryLimitsContext feature flag 2025-11-19 07:43:21 -06:00
Galen
0c989fa3c9 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-19 07:13:58 -06:00
Galen
6004f14a38 chore: lint 2025-11-18 17:30:38 -06:00
Galen
9ec94f74b9 chore: fix lint 2025-11-18 17:09:34 -06:00
Galen
4a63a94b0d fix: frontend query splitting 2025-11-18 16:25:19 -06:00
Galen
0720aa1d88 Merge remote-tracking branch 'origin/main' into gtk-grafana/loki-query-plan-poc 2025-11-18 14:16:46 -06:00
Galen
738855157e test: add api test 2025-11-18 13:36:40 -06:00
Galen
2bb6e6bd4b chore: lint backend, frontend docs 2025-11-18 12:53:53 -06:00
Galen
61cb899515 chore: backend tests, clean up 2025-11-18 12:41:03 -06:00
Galen
e0a4017838 chore: update backend to match spec defined in /loki/pull/19900 2025-11-18 10:50:12 -06:00
Galen
3c8ff953dd chore: update header names 2025-11-17 16:06:33 -06:00
Galen
cb2eacbdfc chore: poc sending query plan to loki query_range 2025-11-04 15:22:27 -06:00
26 changed files with 708 additions and 61 deletions

View File

@@ -597,7 +597,7 @@ export interface FeatureToggles {
*/
alertingPrometheusRulesPrimary?: boolean;
/**
* Used in Logs Drilldown to split queries into multiple queries based on the number of shards
* Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
*/
exploreLogsShardSplitting?: boolean;
/**
@@ -1183,6 +1183,10 @@ export interface FeatureToggles {
*/
ttlPluginInstanceManager?: boolean;
/**
* Send X-Loki-Query-Limits-Context header to Loki on first split request
*/
lokiQueryLimitsContext?: boolean;
/**
* Enables the new version of rudderstack
* @default false
*/

View File

@@ -50,6 +50,14 @@ export interface LokiDataQuery extends common.DataQuery {
* Used to override the name of the series.
*/
legendFormat?: string;
/**
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
*/
limitsContext?: {
expr: string;
from: number;
to: number;
};
/**
* Used to limit the number of log rows returned.
*/

View File

@@ -1041,7 +1041,7 @@ var (
},
{
Name: "exploreLogsShardSplitting",
Description: "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
Description: "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
Stage: FeatureStageExperimental,
FrontendOnly: true,
Owner: grafanaObservabilityLogsSquad,
@@ -2054,6 +2054,13 @@ var (
FrontendOnly: true,
Owner: grafanaPluginsPlatformSquad,
},
{
Name: "lokiQueryLimitsContext",
Description: "Send X-Loki-Query-Limits-Context header to Loki on first split request",
Stage: FeatureStageExperimental,
FrontendOnly: true,
Owner: grafanaObservabilityLogsSquad,
},
{
Name: "rudderstackUpgrade",
Description: "Enables the new version of rudderstack",

View File

@@ -264,4 +264,5 @@ kubernetesAnnotations,experimental,@grafana/grafana-backend-services-squad,false
awsDatasourcesHttpProxy,experimental,@grafana/aws-datasources,false,false,false
transformationsEmptyPlaceholder,preview,@grafana/datapro,false,false,true
ttlPluginInstanceManager,experimental,@grafana/plugins-platform-backend,false,false,true
lokiQueryLimitsContext,experimental,@grafana/observability-logs,false,false,true
rudderstackUpgrade,experimental,@grafana/grafana-frontend-platform,false,false,true
1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
264 awsDatasourcesHttpProxy experimental @grafana/aws-datasources false false false
265 transformationsEmptyPlaceholder preview @grafana/datapro false false true
266 ttlPluginInstanceManager experimental @grafana/plugins-platform-backend false false true
267 lokiQueryLimitsContext experimental @grafana/observability-logs false false true
268 rudderstackUpgrade experimental @grafana/grafana-frontend-platform false false true

View File

@@ -552,7 +552,7 @@ const (
FlagAlertingPrometheusRulesPrimary = "alertingPrometheusRulesPrimary"
// FlagExploreLogsShardSplitting
// Used in Logs Drilldown to split queries into multiple queries based on the number of shards
// Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards
FlagExploreLogsShardSplitting = "exploreLogsShardSplitting"
// FlagExploreLogsAggregatedMetrics
@@ -1066,6 +1066,10 @@ const (
// Enable TTL plugin instance manager
FlagTtlPluginInstanceManager = "ttlPluginInstanceManager"
// FlagLokiQueryLimitsContext
// Send X-Loki-Query-Limits-Context header to Loki on first split request
FlagLokiQueryLimitsContext = "lokiQueryLimitsContext"
// FlagRudderstackUpgrade
// Enables the new version of rudderstack
FlagRudderstackUpgrade = "rudderstackUpgrade"

View File

@@ -1574,11 +1574,14 @@
{
"metadata": {
"name": "exploreLogsShardSplitting",
"resourceVersion": "1753448760331",
"creationTimestamp": "2024-08-29T13:55:59Z"
"resourceVersion": "1763611567823",
"creationTimestamp": "2024-08-29T13:55:59Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-11-20 04:06:07.82367 +0000 UTC"
}
},
"spec": {
"description": "Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
"description": "Deprecated. Replace with lokiShardSplitting. Used in Logs Drilldown to split queries into multiple queries based on the number of shards",
"stage": "experimental",
"codeowner": "@grafana/observability-logs",
"frontend": true
@@ -2636,6 +2639,19 @@
"codeowner": "@grafana/observability-logs"
}
},
{
"metadata": {
"name": "lokiQueryLimitsContext",
"resourceVersion": "1763558434858",
"creationTimestamp": "2025-11-19T13:20:34Z"
},
"spec": {
"description": "Send X-Loki-Query-Limits-Context header to Loki on first split request",
"stage": "experimental",
"codeowner": "@grafana/observability-logs",
"frontend": true
}
},
{
"metadata": {
"name": "lokiQuerySplitting",

View File

@@ -96,6 +96,8 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
}
addQueryLimitsHeader(query, req)
if query.SupportingQueryType != SupportingQueryNone {
value := getSupportingQueryHeaderValue(query.SupportingQueryType)
if value != "" {
@@ -108,6 +110,15 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
return req, nil
}
func addQueryLimitsHeader(query lokiQuery, req *http.Request) {
if len(query.LimitsContext.Expr) > 0 {
queryLimitStr, err := json.Marshal(query.LimitsContext)
if err == nil {
req.Header.Set("X-Loki-Query-Limits-Context", string(queryLimitStr))
}
}
}
type lokiResponseError struct {
Message string `json:"message"`
TraceID string `json:"traceID,omitempty"`

View File

@@ -2,10 +2,12 @@ package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
@@ -47,6 +49,56 @@ func TestApiLogVolume(t *testing.T) {
require.True(t, called)
})
t.Run("X-Loki-Query-Limits-Context header should be set when LimitsContext is provided", func(t *testing.T) {
called := false
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
to := time.Now().Truncate(time.Millisecond)
limitsContext := LimitsContext{
Expr: "{cluster=\"us-central1\"}",
From: from,
To: to,
}
limitsContextJson, _ := json.Marshal(limitsContext)
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, string(limitsContextJson), req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is missing expr", func(t *testing.T) {
called := false
from := time.Now().Truncate(time.Millisecond).Add(-1 * time.Hour)
to := time.Now().Truncate(time.Millisecond)
limitsContext := LimitsContext{
Expr: "",
From: from,
To: to,
}
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange, LimitsContext: limitsContext}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("X-Loki-Query-Limits-Context header should not get set when LimitsContext is not provided", func(t *testing.T) {
called := false
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {
called = true
require.Equal(t, "", req.Header.Get("X-Loki-Query-Limits-Context"))
})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
t.Run("data sample queries should set data sample http header", func(t *testing.T) {
called := false
api := makeMockedAPI(200, "application/json", response, func(req *http.Request) {

View File

@@ -18,6 +18,17 @@ const (
QueryEditorModeBuilder QueryEditorMode = "builder"
)
type LimitsContext struct {
Expr string `json:"expr"`
From int64 `json:"from"`
To int64 `json:"to"`
}
// NewLimitsContext creates a new LimitsContext object.
func NewLimitsContext() *LimitsContext {
return &LimitsContext{}
}
type LokiQueryType string
const (
@@ -59,6 +70,8 @@ type LokiDataQuery struct {
Instant *bool `json:"instant,omitempty"`
// Used to set step value for range queries.
Step *string `json:"step,omitempty"`
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
LimitsContext *LimitsContext `json:"limitsContext,omitempty"`
// A unique identifier for the query within the list of targets.
// In server side expressions, the refId is used as a variable name to identify results.
// By default, the UI will assign A->Z; however setting meaningful names may be useful.

View File

@@ -156,6 +156,8 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
expr := interpolateVariables(model.Expr, interval, timeRange, queryType, step)
limitsConfig := generateLimitsConfig(model, interval, timeRange, queryType, step)
direction, err := parseDirection(model.Direction)
if err != nil {
return nil, err
@@ -192,8 +194,21 @@ func parseQuery(queryContext *backend.QueryDataRequest, logqlScopesEnabled bool)
RefID: query.RefID,
SupportingQueryType: supportingQueryType,
Scopes: model.Scopes,
LimitsContext: limitsConfig,
})
}
return qs, nil
}
func generateLimitsConfig(model *QueryJSONModel, interval time.Duration, timeRange time.Duration, queryType QueryType, step time.Duration) LimitsContext {
var limitsConfig LimitsContext
// Only supply limits context config if we have expression, and from and to
if model.LimitsContext != nil && model.LimitsContext.Expr != "" && model.LimitsContext.From > 0 && model.LimitsContext.To > 0 {
// If a limits expression was provided, interpolate it and parse the time range
limitsConfig.Expr = interpolateVariables(model.LimitsContext.Expr, interval, timeRange, queryType, step)
limitsConfig.From = time.UnixMilli(model.LimitsContext.From)
limitsConfig.To = time.UnixMilli(model.LimitsContext.To)
}
return limitsConfig
}

View File

@@ -1,6 +1,7 @@
package loki
import (
"strconv"
"testing"
"time"
@@ -145,6 +146,74 @@ func TestParseQuery(t *testing.T) {
require.Equal(t, `{namespace="logish"} |= "problems"`, models[0].Expr)
})
t.Run("parsing query model with invalid query limits context expr", func(t *testing.T) {
from := time.Now().Add(-3000 * time.Second)
fullFrom := time.Now().Add(-1 * time.Hour)
to := time.Now()
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
"format": "time_series",
"refId": "A",
"limitsContext": {"expr": "", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
}`,
),
TimeRange: backend.TimeRange{
From: from,
To: to,
},
Interval: time.Second * 15,
MaxDataPoints: 200,
},
},
}
models, err := parseQuery(queryContext, true)
require.NoError(t, err)
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
// If the limits context expression is missing, we don't set any limits context
require.Equal(t, ``, models[0].LimitsContext.Expr)
require.Equal(t, time.Time{}, models[0].LimitsContext.To)
require.Equal(t, time.Time{}, models[0].LimitsContext.From)
})
t.Run("parsing query model with query limits context", func(t *testing.T) {
from := time.Now().Add(-3000 * time.Second)
fullFrom := time.Now().Add(-1 * time.Hour)
to := time.Now()
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "count_over_time({service_name=\"apache\", __stream_shard__=\"2\"}[$__auto])",
"format": "time_series",
"refId": "A",
"limitsContext": {"expr": "count_over_time({service_name=\"apache\"}[$__auto])", "from": ` + strconv.FormatInt(fullFrom.UnixMilli(), 10) + `, "to": ` + strconv.FormatInt(to.UnixMilli(), 10) + `}
}`,
),
TimeRange: backend.TimeRange{
From: from,
To: to,
},
Interval: time.Second * 15,
MaxDataPoints: 200,
},
},
}
models, err := parseQuery(queryContext, true)
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
require.Equal(t, `count_over_time({service_name="apache", __stream_shard__="2"}[15s])`, models[0].Expr)
require.Equal(t, `count_over_time({service_name="apache"}[15s])`, models[0].LimitsContext.Expr)
require.Equal(t, to.Truncate(time.Millisecond), models[0].LimitsContext.To)
require.Equal(t, fullFrom.Truncate(time.Millisecond), models[0].LimitsContext.From)
})
t.Run("interpolate variables, range between 1s and 0.5s", func(t *testing.T) {
expr := "go_goroutines $__interval $__interval_ms $__range $__range_s $__range_ms"
queryType := dataquery.LokiQueryTypeRange

View File

@@ -11,6 +11,11 @@ import (
type QueryType = dataquery.LokiQueryType
type SupportingQueryType = dataquery.SupportingQueryType
type Direction = dataquery.LokiQueryDirection
type LimitsContext struct {
Expr string
From time.Time
To time.Time
}
const (
QueryTypeRange = dataquery.LokiQueryTypeRange
@@ -42,4 +47,5 @@ type lokiQuery struct {
RefID string
SupportingQueryType SupportingQueryType
Scopes []scope.ScopeFilter
LimitsContext LimitsContext
}

View File

@@ -795,6 +795,7 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
<PanelChrome
title={t('explore.unthemed-logs.title-logs-volume', 'Logs volume')}
collapsible
loadingState={logsVolumeData?.state}
collapsed={!logsVolumeEnabled}
onToggleCollapse={onToggleLogsVolumeCollapse}
>

View File

@@ -26,7 +26,7 @@ import { mergeLogsVolumeDataFrames, isLogsVolumeLimited, getLogsVolumeMaximumRan
import { SupplementaryResultError } from '../SupplementaryResultError';
import { LogsVolumePanel } from './LogsVolumePanel';
import { isTimeoutErrorResponse } from './utils/logsVolumeResponse';
import { isClientErrorResponse } from './utils/logsVolumeResponse';
type Props = {
logsVolumeData: DataQueryResponse | undefined;
@@ -92,7 +92,7 @@ export const LogsVolumePanelList = ({
const canShowPartialData =
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
const timeoutError = isTimeoutErrorResponse(logsVolumeData);
const clientError = isClientErrorResponse(logsVolumeData);
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
const to = dateTime(Math.min(absoluteRange.to, allLogsVolumeMaximumRange.to));
@@ -123,7 +123,7 @@ export const LogsVolumePanelList = ({
<Trans i18nKey="explore.logs-volume-panel-list.loading">Loading...</Trans>
</span>
);
} else if (timeoutError && !canShowPartialData) {
} else if (clientError && !canShowPartialData) {
return (
<SupplementaryResultError
title={t('explore.logs-volume-panel-list.title-unable-to-show-log-volume', 'Unable to show log volume')}
@@ -184,7 +184,7 @@ export const LogsVolumePanelList = ({
return (
<div className={styles.listContainer}>
{timeoutError && canShowPartialData && (
{clientError && canShowPartialData && (
<SupplementaryResultError
title={t('explore.logs-volume-panel-list.title-showing-partial-data', 'Showing partial data')}
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."

View File

@@ -1,6 +1,6 @@
import { DataQueryResponse } from '@grafana/data';
import { isTimeoutErrorResponse } from './logsVolumeResponse';
import { isClientErrorResponse } from './logsVolumeResponse';
const errorA =
'Get "http://localhost:3100/loki/api/v1/query_range?direction=backward&end=1680001200000000000&limit=1000&query=sum+by+%28level%29+%28count_over_time%28%7Bcontainer_name%3D%22docker-compose-app-1%22%7D%5B1h%5D%29%29&start=1679914800000000000&step=3600000ms": net/http: request canceled (Client.Timeout exceeded while awaiting headers)';
@@ -16,7 +16,7 @@ describe('isTimeoutErrorResponse', () => {
message: timeoutError,
},
};
expect(isTimeoutErrorResponse(response)).toBe(true);
expect(isClientErrorResponse(response)).toBe(true);
}
);
test.each([errorA, errorB])(
@@ -33,7 +33,7 @@ describe('isTimeoutErrorResponse', () => {
},
],
};
expect(isTimeoutErrorResponse(response)).toBe(true);
expect(isClientErrorResponse(response)).toBe(true);
}
);
test.each([errorA, errorB])(
@@ -54,13 +54,13 @@ describe('isTimeoutErrorResponse', () => {
},
],
};
expect(isTimeoutErrorResponse(response)).toBe(true);
expect(isClientErrorResponse(response)).toBe(true);
}
);
test('does not report false positives', () => {
const response: DataQueryResponse = {
data: [],
};
expect(isTimeoutErrorResponse(response)).toBe(false);
expect(isClientErrorResponse(response)).toBe(false);
});
});

View File

@@ -1,7 +1,8 @@
import { DataQueryError, DataQueryResponse } from '@grafana/data';
import { is4xxError } from '@grafana-plugins/loki/responseUtils';
// Currently we can only infer if an error response is a timeout or not.
export function isTimeoutErrorResponse(response: DataQueryResponse | undefined): boolean {
export function isClientErrorResponse(response: DataQueryResponse | undefined): boolean {
if (!response) {
return false;
}
@@ -13,6 +14,8 @@ export function isTimeoutErrorResponse(response: DataQueryResponse | undefined):
return errors.some((error: DataQueryError) => {
const message = `${error.message || error.data?.message}`?.toLowerCase();
return message.includes('timeout');
return (
message.includes('timeout') || message?.includes('the query would read too many bytes') || is4xxError(response)
);
});
}

View File

@@ -42,6 +42,14 @@ composableKinds: DataQuery: {
instant?: bool
// Used to set step value for range queries.
step?: string
// The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
limitsContext?: #LimitsContext
#LimitsContext: {
expr: string
from: int64
to: int64
}
#QueryEditorMode: "code" | "builder" @cuetsy(kind="enum")

View File

@@ -48,6 +48,14 @@ export interface LokiDataQuery extends common.DataQuery {
* Used to override the name of the series.
*/
legendFormat?: string;
/**
* The full query plan for split/shard queries. Encoded and sent to Loki via `X-Loki-Query-Limits-Context` header. Requires "lokiQueryLimitsContext" feature flag
*/
limitsContext?: {
expr: string;
from: number;
to: number;
};
/**
* Used to limit the number of log rows returned.
*/

View File

@@ -36,6 +36,11 @@ function getFrameKey(frame: DataFrame): string | undefined {
return frame.refId ?? frame.name;
}
/**
* @todo test new response is error, current response is not
* @param currentResponse
* @param newResponse
*/
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
if (!currentResponse) {
return cloneQueryResponse(newResponse);
@@ -65,6 +70,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
if (mergedErrors.length > 0) {
currentResponse.errors = mergedErrors;
currentResponse.state = LoadingState.Error;
}
// the `.error` attribute is obsolete now,
@@ -75,6 +81,7 @@ export function combineResponses(currentResponse: DataQueryResponse | null, newR
const mergedError = currentResponse.error ?? newResponse.error;
if (mergedError != null) {
currentResponse.error = mergedError;
currentResponse.state = LoadingState.Error;
}
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];

View File

@@ -222,6 +222,33 @@ export function getMockFrames() {
length: 2,
};
const metricFrameAB: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1000000, 2000000, 3000000, 4000000],
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: [6, 7, 5, 4],
labels: {
level: 'debug',
},
},
],
meta: {
notices: [],
type: DataFrameType.TimeSeriesMulti,
stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }],
},
length: 4,
};
const metricFrameC: DataFrame = {
refId: 'A',
name: 'some-time-series',
@@ -305,6 +332,7 @@ export function getMockFrames() {
metricFrameA,
metricFrameB,
metricFrameC,
metricFrameAB,
emptyFrame,
};
}

View File

@@ -1,6 +1,6 @@
import { of } from 'rxjs';
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data';
import { DataQueryError, DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource';
@@ -16,6 +16,7 @@ jest.mock('uuid', () => ({
}));
const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
const originalErr = console.error;
beforeEach(() => {
jest.spyOn(console, 'error').mockImplementation(() => {});
@@ -26,22 +27,23 @@ beforeAll(() => {
callback();
});
config.featureToggles.lokiShardSplitting = false;
config.featureToggles.lokiQueryLimitsContext = true;
});
afterAll(() => {
jest.mocked(global.setTimeout).mockReset();
config.featureToggles.lokiShardSplitting = originalShardingFlagState;
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
console.error = originalErr;
});
describe('runSplitQuery()', () => {
let datasource: LokiDatasource;
const from = dateTime('2023-02-08T05:00:00.000Z');
const to = dateTime('2023-02-10T06:00:00.000Z');
const range = {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
raw: {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
},
from,
to,
raw: { from, to },
};
const createRequest = (targets: LokiQuery[], overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
@@ -165,6 +167,19 @@ describe('runSplitQuery()', () => {
_i: 1676008800000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: {
expr: 'count_over_time({a="b"}[1m])',
from: from.valueOf(),
to: to.valueOf(),
},
},
],
})
);
@@ -183,6 +198,15 @@ describe('runSplitQuery()', () => {
_i: 1676005140000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: undefined,
},
],
})
);
@@ -201,6 +225,15 @@ describe('runSplitQuery()', () => {
_i: 1675918740000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: undefined,
limitsContext: undefined,
},
],
})
);
});
@@ -225,6 +258,19 @@ describe('runSplitQuery()', () => {
_i: 1676008800000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: {
expr: 'count_over_time({a="b"}[1m])',
from: from.valueOf(),
to: to.valueOf(),
},
},
],
})
);
@@ -243,6 +289,15 @@ describe('runSplitQuery()', () => {
_i: 1676005190000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: undefined,
},
],
})
);
@@ -261,21 +316,127 @@ describe('runSplitQuery()', () => {
_i: 1675918790000,
}),
}),
targets: [
{
expr: 'count_over_time({a="b"}[1m])',
legendFormat: undefined,
refId: 'A',
step: '10s',
limitsContext: undefined,
},
],
})
);
});
});
test('Handles and reports errors', async () => {
test('Retries 5xx errors', async () => {
const { metricFrameA, metricFrameB, metricFrameAB } = getMockFrames();
const error: DataQueryError = {
message: 'OOPSIE',
status: 518,
};
const errResponse: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
key: 'uuid',
};
const response: DataQueryResponse = {
state: LoadingState.Done,
data: [metricFrameA],
key: 'uuid',
};
const response2: DataQueryResponse = {
state: LoadingState.Done,
data: [metricFrameB],
key: 'uuid',
};
jest
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
.mockReturnValueOnce(of(errResponse))
.mockReturnValueOnce(of(response))
.mockReturnValueOnce(of(response2));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(4);
expect(values[0]).toEqual(
expect.objectContaining({
data: [metricFrameAB],
key: 'uuid',
state: LoadingState.Done,
})
);
});
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
});
test('Handles and reports 5xx error too many bytes', async () => {
const error: DataQueryError = {
message: 'the query would read too many bytes ...',
status: 500,
};
const response: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
};
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({ error: { refId: 'A', message: 'Error' }, state: LoadingState.Streaming })
expect.objectContaining({
errors: [error],
state: LoadingState.Error,
})
);
});
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Handles and reports 4xx errors', async () => {
const error: DataQueryError = {
message: 'BAD REQUEST',
status: 418,
};
const response: DataQueryResponse = {
state: LoadingState.Error,
data: [],
errors: [error],
};
jest.spyOn(datasource, 'runQuery').mockReturnValue(of(response));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({
errors: [error],
state: LoadingState.Error,
})
);
});
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Handles and reports errors (deprecated error)', async () => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(
of({
state: LoadingState.Error,
error: { refId: 'A', message: 'the query would read too many bytes ...' },
data: [],
key: 'uuid',
})
);
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({
error: { refId: 'A', message: 'the query would read too many bytes ...' },
state: LoadingState.Error,
})
);
});
// Errors are not retried
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
describe('Hidden and empty queries', () => {

View File

@@ -8,16 +8,18 @@ import {
DataQueryResponse,
DataTopic,
dateTime,
rangeUtil,
TimeRange,
LoadingState,
rangeUtil,
store,
TimeRange,
} from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource';
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
import { combineResponses } from './mergeResponses';
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
import { addQueryLimitsContext, isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
import { isRetriableError } from './responseUtils';
import { trackGroupedQueries } from './tracking';
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
@@ -55,6 +57,10 @@ interface QuerySplittingOptions {
* Do not retry failed queries.
*/
disableRetry?: boolean;
/**
* The current index of all query attempts
*/
shardQueryIndex?: number;
}
/**
@@ -85,6 +91,25 @@ export function adjustTargetsFromResponseState(targets: LokiQuery[], response: D
})
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
}
const addLimitsToSplitRequests = (splitQueryIndex: number, shardQueryIndex: number, requests: LokiGroupedRequest[]) => {
// requests has already been mutated
return requests.map((r) => ({
...r,
request: {
...r.request,
targets: r.request.targets.map((t) => {
// @todo if we retry the first request, we will strip out the query limits context
if (splitQueryIndex === 0 && shardQueryIndex === 0) {
// Don't pull from request if it has already been added by `addLimitsToShardGroups`
return t.limitsContext === undefined ? addQueryLimitsContext(t, r.request) : t;
}
return { ...t, limitsContext: undefined };
}),
},
}));
};
export function runSplitGroupedQueries(
datasource: LokiDatasource,
requests: LokiGroupedRequest[],
@@ -99,8 +124,15 @@ export function runSplitGroupedQueries(
let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null;
let splitQueryIndex = 0;
const shardQueryIndex = options.shardQueryIndex ?? 0;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
if (config.featureToggles.lokiQueryLimitsContext) {
requests = addLimitsToSplitRequests(splitQueryIndex, shardQueryIndex, requests);
}
splitQueryIndex++;
let retrying = false;
if (subquerySubscription != null) {
@@ -114,7 +146,10 @@ export function runSplitGroupedQueries(
}
const done = () => {
mergedResponse.state = LoadingState.Done;
if (mergedResponse.state !== LoadingState.Error) {
mergedResponse.state = LoadingState.Done;
}
subscriber.next(mergedResponse);
subscriber.complete();
};
@@ -189,6 +224,10 @@ export function runSplitGroupedQueries(
if (!options.skipPartialUpdates) {
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
}
if (mergedResponse.state === LoadingState.Error) {
done();
}
},
complete: () => {
if (retrying) {
@@ -301,7 +340,9 @@ export function runSplitQuery(
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
request.queryGroupId = uuidv4();
const oneDayMs = 24 * 60 * 60 * 1000;
// Allow custom split durations for debugging, e.g. `localStorage.setItem('grafana.loki.querySplitInterval', 24 * 60 * 1000) // 1 hour`
const debugSplitDuration = parseInt(store.get('grafana.loki.querySplitInterval'), 10);
const oneDayMs = debugSplitDuration || 24 * 60 * 60 * 1000;
const directionPartitionedLogQueries = groupBy(logQueries, (query) =>
query.direction === LokiQueryDirection.Forward ? LokiQueryDirection.Forward : LokiQueryDirection.Backward
);

View File

@@ -1,6 +1,7 @@
import { SyntaxNode } from '@lezer/common';
import { escapeRegExp } from 'lodash';
import { DataQueryRequest } from '@grafana/data';
import {
parser,
LineFilter,
@@ -310,6 +311,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
export function requestSupportsSplitting(allQueries: LokiQuery[]) {
const queries = allQueries
.filter((query) => !query.hide)
.filter((query) => query.queryType !== LokiQueryType.Instant)
.filter((query) => !query.refId.includes('do-not-chunk'))
.filter((query) => query.expr);
@@ -425,3 +427,21 @@ export const getSelectorForShardValues = (query: string) => {
}
return '';
};
/**
* Adds query plan to shard/split queries
* Must be called after interpolation step!
*
* @param lokiQuery
* @param request
*/
export const addQueryLimitsContext = (lokiQuery: LokiQuery, request: DataQueryRequest<LokiQuery>) => {
return {
...lokiQuery,
limitsContext: {
expr: lokiQuery.expr,
from: request.range.from.toDate().getTime(),
to: request.range.to.toDate().getTime(),
},
};
};

View File

@@ -1,4 +1,4 @@
import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
import { DataFrame, DataQueryError, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
import { isBytesString, processLabels } from './languageUtils';
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
@@ -134,13 +134,44 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
export function isRetriableError(errorResponse: DataQueryResponse) {
const message = errorResponse.errors
? (errorResponse.errors[0].message ?? '').toLowerCase()
: (errorResponse.error?.message ?? '');
? errorResponse.errors
.map((err) => err.message ?? '')
.join()
.toLowerCase()
: (errorResponse.error?.message ?? '').toLowerCase();
// max_query_bytes_read exceeded, currently 500 when should be 4xx
if (message.includes('the query would read too many bytes') || is4xxError(errorResponse)) {
throw new Error(message);
}
if (message.includes('timeout')) {
return true;
} else if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
}
if (errorResponse.data.length > 0 && errorResponse.data[0].fields.length > 0) {
// Error response but we're receiving data, continue querying.
return false;
}
if (is5xxError(errorResponse)) {
return true;
}
throw new Error(message);
}
export function is4xxError(errorResponse: DataQueryResponse) {
/**
* Before https://github.com/grafana/grafana/pull/114201 the Loki data source always returns a 500 for every error response type in the response body, and this is what Grafana uses to populate the DataQueryError
* Since the frontend and backend are being deployed separately now we might want to continue to check error messages for a bit until we are sure that the correct status code is always set in the data query response.
*
* @param errorResponse
*/
return isHttpErrorType(errorResponse, '4');
}
export function is5xxError(errorResponse: DataQueryResponse) {
return isHttpErrorType(errorResponse, '5');
}
function isHttpErrorType(errorResponse: DataQueryResponse, responseType: '2' | '3' | '4' | '5') {
const isErrOfType = (err: DataQueryError) => err.status && Array.from(err.status?.toString())[0] === responseType;
return (errorResponse.error && isErrOfType(errorResponse.error)) || errorResponse.errors?.some(isErrOfType);
}

View File

@@ -1,6 +1,7 @@
import { of } from 'rxjs';
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource';
import { createLokiDatasource } from './mocks/datasource';
@@ -12,6 +13,8 @@ jest.mock('uuid', () => ({
v4: jest.fn().mockReturnValue('uuid'),
}));
const originalLokiQueryLimitsContextState = config.featureToggles.lokiQueryLimitsContext;
const originalLog = console.log;
const originalWarn = console.warn;
const originalErr = console.error;
@@ -20,20 +23,26 @@ beforeEach(() => {
jest.spyOn(console, 'warn').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {});
});
beforeAll(() => {
config.featureToggles.lokiQueryLimitsContext = true;
});
afterAll(() => {
console.log = originalLog;
console.warn = originalWarn;
console.error = originalErr;
config.featureToggles.lokiQueryLimitsContext = originalLokiQueryLimitsContextState;
});
describe('runShardSplitQuery()', () => {
let datasource: LokiDatasource;
const from = dateTime('2023-02-08T04:00:00.000Z');
const to = dateTime('2023-02-08T11:00:00.000Z');
const range = {
from: dateTime('2023-02-08T04:00:00.000Z'),
to: dateTime('2023-02-08T11:00:00.000Z'),
from,
to,
raw: {
from: dateTime('2023-02-08T04:00:00.000Z'),
to: dateTime('2023-02-08T11:00:00.000Z'),
from,
to,
},
};
@@ -139,6 +148,11 @@ describe('runShardSplitQuery()', () => {
targets: [
{
expr: '{a="b", __stream_shard__=~"20|10"}',
limitsContext: {
expr: `{a="b"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@@ -209,6 +223,11 @@ describe('runShardSplitQuery()', () => {
targets: [
{
expr: '{service_name="test", filter="true", __stream_shard__=~"20|10"}',
limitsContext: {
expr: `{service_name="test", filter="true"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@@ -241,28 +260,113 @@ describe('runShardSplitQuery()', () => {
});
});
test('Failed requests have loading state Error', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
describe('Errors', () => {
beforeEach(() => {
const querySplittingRange = {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
raw: {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
},
};
request = createRequest([{ expr: '$SELECTOR', refId: 'A', direction: LokiQueryDirection.Scan }], {
range: querySplittingRange,
});
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
});
});
test('Does not retry on other errors', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
test('Failed 4xx responses have loading state Error', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '12', '5']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValue(
of({ state: LoadingState.Error, error: { refId: 'A', message: 'client error', status: 400 }, data: [] })
);
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
});
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Max query bytes errors are not retried', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
errors: [{ refId: 'A', message: 'the query would read too many bytes ...', status: 500 }],
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
test('Failed 5xx requests are retried', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
errors: [{ refId: 'A', message: 'parse error', status: 500 }],
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Done);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
});
test('Failed 5xx requests are retried (dep)', async () => {
const errResp: DataQueryResponse = {
state: LoadingState.Error,
error: { refId: 'A', message: 'parse error', status: 500 },
data: [],
};
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '4']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of(errResp))
.mockReturnValueOnce(of({ state: LoadingState.Done, data: [], status: 200 }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Done);
});
// 5 shards, 3 groups + empty shard group, 4 requests * 3 days, 3 chunks, 3 requests + 1 retriable error = 13 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(13);
});
test('Does not retry on other errors', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(
of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })
);
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
});
});
test('Adjusts the group size based on errors and execution time', async () => {
@@ -425,6 +529,11 @@ describe('runShardSplitQuery()', () => {
targets: [
{
expr: '{a="b", __stream_shard__=~"20|10|9"}',
limitsContext: {
expr: `{a="b"}`,
from: from.valueOf(),
to: to.valueOf(),
},
refId: 'A',
direction: LokiQueryDirection.Scan,
},

View File

@@ -2,15 +2,20 @@ import { groupBy, partition } from 'lodash';
import { Observable, Subscriber, Subscription } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
import { DataQueryRequest, DataQueryResponse, LoadingState, QueryResultMetaStat } from '@grafana/data';
import { config } from '@grafana/runtime';
import { LokiDatasource } from './datasource';
import { combineResponses, replaceResponses } from './mergeResponses';
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils';
import {
addQueryLimitsContext,
getSelectorForShardValues,
interpolateShardingSelector,
requestSupportsSharding,
} from './queryUtils';
import { isRetriableError } from './responseUtils';
import { LokiQuery } from './types';
/**
* Query splitting by stream shards.
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
@@ -54,6 +59,19 @@ export function runShardSplitQuery(datasource: LokiDatasource, request: DataQuer
return splitQueriesByStreamShard(datasource, request, queries);
}
const addLimitsToShardGroups = (
queryIndex: number,
groups: ShardedQueryGroup[],
request: DataQueryRequest<LokiQuery>
) => {
return groups.map((g) => ({
...g,
targets: g.targets.map((t) => {
return queryIndex === 0 ? addQueryLimitsContext(t, request) : { ...t, limitsContext: undefined };
}),
}));
};
function splitQueriesByStreamShard(
datasource: LokiDatasource,
request: DataQueryRequest<LokiQuery>,
@@ -64,8 +82,13 @@ function splitQueriesByStreamShard(
let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null;
let queryIndex = 0;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
if (config.featureToggles.lokiQueryLimitsContext) {
groups = addLimitsToShardGroups(queryIndex, groups, request);
}
queryIndex++;
let nextGroupSize = groups[group].groupSize;
const { shards, groupSize, cycle } = groups[group];
let retrying = false;
@@ -164,6 +187,7 @@ function splitQueriesByStreamShard(
subquerySubscription = runSplitQuery(datasource, subRequest, {
skipPartialUpdates: true,
disableRetry: true,
shardQueryIndex: queryIndex - 1,
}).subscribe({
next: (partialResponse: DataQueryResponse) => {
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {