Pull request 2501: AGDNS-3314-imp-querylog-cognit

Squashed commit of the following:

commit 77ebcf06d7e4efe4e4e0e1b4d74ba8ccc5c292e1
Merge: 2c4ad98d8 cf6096b7a
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Oct 16 16:33:33 2025 +0300

    Merge branch 'master' into AGDNS-3314-imp-querylog-cognit

commit 2c4ad98d86
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Wed Oct 15 10:13:54 2025 +0300

    querylog: imp docs

commit 412d186336
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Tue Oct 14 15:48:34 2025 +0300

    querylog: imp cognit
This commit is contained in:
Stanislav Chzhen
2025-10-16 16:49:44 +03:00
parent cf6096b7af
commit b76d100406
4 changed files with 171 additions and 102 deletions

View File

@@ -120,85 +120,146 @@ func (q *qLogFile) seekTS(
return 0, 0, err
}
// Define the search scope.
// Start of the search interval (position in the file).
start := int64(0)
// End of the search interval (position in the file).
end := fileInfo.Size()
// Probe is the approximate index of the line we'll try to check.
probe := (end - start) / 2
var line string
// Index of the probe line in the file.
var lineIdx int64
var lineEndIdx int64
// Index of the last probe line.
var lastProbeLineIdx int64
lastProbeLineIdx = -1
fileSize := fileInfo.Size()
st := &tsSearchState{
start: 0,
end: fileSize,
probe: fileSize / 2,
lastProbeLineIdx: -1,
fileSize: fileSize,
}
// Count seek depth in order to detect mistakes. If depth is too large,
// we should stop the search.
for {
// Get the line at the specified position.
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
var found bool
found, err = q.seekTSStep(ctx, logger, timestamp, st)
if err != nil {
return 0, depth, err
return 0, st.depth, err
}
// Check if the line index if invalid.
err = q.validateQLogLineIdx(lineIdx, lastProbeLineIdx, timestamp, fileInfo.Size())
if err != nil {
return 0, depth, err
}
// Save the last found idx.
lastProbeLineIdx = lineIdx
// Get the timestamp from the query log record.
ts := readQLogTimestamp(ctx, logger, line)
if ts == 0 {
return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: record %q has empty timestamp",
timestamp,
q.file.Name(),
line,
)
}
if ts == timestamp {
// Hurray, returning the result.
if found {
break
}
// Narrow the scope and repeat the search.
if ts > timestamp {
// If the timestamp we're looking for is OLDER than what we found,
// then the line is somewhere on the LEFT side from the current
// probe position.
end = lineIdx
} else {
// If the timestamp we're looking for is NEWER than what we found,
// then the line is somewhere on the RIGHT side from the current
// probe position.
start = lineEndIdx
}
probe = start + (end-start)/2
depth++
if depth >= 100 {
return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: depth %d too high: %w",
timestamp,
q.file.Name(),
depth,
errTSNotFound,
)
}
}
q.position = lineIdx + int64(len(line))
return q.position, depth, nil
q.position = st.lineIdx + int64(len(st.line))
return q.position, st.depth, nil
}
// tsSearchState contains the state of the binary-search loop.
type tsSearchState struct {
// line holds the contents of the matched line.
line string
// end is the end of the current search interval.
end int64
// fileSize is the total file size in bytes.
fileSize int64
// lastProbeLineIdx is the byte offset of the last probed line start.
lastProbeLineIdx int64
// lineIdx is the byte offset of the current line start.
lineIdx int64
// probe is the byte offset to probe.
probe int64
// start is the start of current search interval.
start int64
// depth is the number of search iterations performed.
depth int
}
// maxSearchDepth is the maximum number of search iterations.
const maxSearchDepth = 100
// seekTSStep performs one iteration of the binary search over the qlog file. l
// and st must not be nil.
func (q *qLogFile) seekTSStep(
ctx context.Context,
l *slog.Logger,
timestamp int64,
st *tsSearchState,
) (found bool, err error) {
line, lineIdx, lineEndIdx, err := q.readAndValidateProbe(st, timestamp)
if err != nil {
// Don't wrap the error, because it's informative enough as is.
return false, err
}
// Get the timestamp from the query log record.
ts := readQLogTimestamp(ctx, l, line)
if ts == 0 {
return false, fmt.Errorf(
"looking up timestamp %d in %q: record %q has empty timestamp",
timestamp,
q.file.Name(),
line,
)
}
if ts == timestamp {
// Found the target record.
st.line = line
st.lineIdx = lineIdx
return true, nil
}
// Narrow the scope and repeat the search.
if ts > timestamp {
// If the timestamp we're looking for is OLDER than what we found, then
// the line is somewhere on the LEFT side from the current probe
// position.
st.end = lineIdx
} else {
// If the timestamp we're looking for is NEWER than what we found, then
// the line is somewhere on the RIGHT side from the current probe
// position.
st.start = lineEndIdx
}
st.probe = st.start + (st.end-st.start)/2
st.depth++
if st.depth >= maxSearchDepth {
return false, fmt.Errorf(
"looking up timestamp %d in %q: depth %d too high: %w",
timestamp,
q.file.Name(),
st.depth,
errTSNotFound,
)
}
return false, nil
}
// readAndValidateProbe reads the probe line and validates the line index.
func (q *qLogFile) readAndValidateProbe(
st *tsSearchState,
timestamp int64,
) (line string, lineIdx, lineEndIdx int64, err error) {
// Get the line at the specified position.
line, lineIdx, lineEndIdx, err = q.readProbeLine(st.probe)
if err != nil {
return "", 0, 0, fmt.Errorf("reading probe line: %w", err)
}
// Check if the line index is invalid.
err = q.validateQLogLineIdx(lineIdx, st.lastProbeLineIdx, timestamp, st.fileSize)
if err != nil {
return "", 0, 0, fmt.Errorf("validating line index: %w", err)
}
// Save the last probed index.
st.lastProbeLineIdx = lineIdx
return line, lineIdx, lineEndIdx, nil
}
// SeekStart changes the current position to the end of the file. Please note,

View File

@@ -121,23 +121,33 @@ func TestQLogFile_ReadNext(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, expPos, pos)
var read int
var line string
for err == nil {
line, err = q.ReadNext()
if err == nil {
assert.NotEmpty(t, line)
read++
}
}
require.Equal(t, io.EOF, err)
read := readAllLines(t, q)
assert.Equal(t, tc.linesNum, read)
})
}
}
// readAllLines is a helper function that reads entries until EOF and returns
// the number of lines read.
func readAllLines(tb testing.TB, q *qLogFile) (n int) {
tb.Helper()
for {
line, err := q.ReadNext()
if err == io.EOF {
break
}
require.NoError(tb, err)
assert.NotEmpty(tb, line)
n++
}
return n
}
func TestQLogFile_SeekTS_good(t *testing.T) {
linesCases := []struct {
name string

View File

@@ -130,24 +130,24 @@ func (r *qLogReader) ReadNext() (string, error) {
for r.currentFile >= 0 {
q := r.qFiles[r.currentFile]
line, err := q.ReadNext()
if err != nil {
// Shift to the older file.
r.currentFile--
if r.currentFile < 0 {
break
}
q = r.qFiles[r.currentFile]
// Set its position to the start right away.
_, err = q.SeekStart()
// This is unexpected, return an error right away.
if err != nil {
return "", err
}
} else {
if err == nil {
return line, nil
}
// Shift to the older file.
r.currentFile--
if r.currentFile < 0 {
break
}
q = r.qFiles[r.currentFile]
// Set its position to the start right away.
_, err = q.SeekStart()
// This is unexpected, return an error right away.
if err != nil {
return "", err
}
}
// Nothing to read anymore.

View File

@@ -217,13 +217,11 @@ func (l *queryLog) readEntries(
) (entries []*logEntry, oldestNano int64, total int) {
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
ent, ts, rErr := l.readNextEntry(ctx, r, params, cache)
if rErr != nil {
if rErr == io.EOF {
oldestNano = 0
break
}
if rErr == io.EOF {
oldestNano = 0
break
} else if rErr != nil {
l.logger.ErrorContext(ctx, "reading next entry", slogutil.KeyError, rErr)
}