mirror of
https://github.com/AdguardTeam/AdGuardHome.git
synced 2025-12-27 12:23:34 +08:00
Pull request 2501: AGDNS-3314-imp-querylog-cognit
Squashed commit of the following: commit 77ebcf06d7e4efe4e4e0e1b4d74ba8ccc5c292e1 Merge:2c4ad98d8cf6096b7aAuthor: Stanislav Chzhen <s.chzhen@adguard.com> Date: Thu Oct 16 16:33:33 2025 +0300 Merge branch 'master' into AGDNS-3314-imp-querylog-cognit commit2c4ad98d86Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Wed Oct 15 10:13:54 2025 +0300 querylog: imp docs commit412d186336Author: Stanislav Chzhen <s.chzhen@adguard.com> Date: Tue Oct 14 15:48:34 2025 +0300 querylog: imp cognit
This commit is contained in:
@@ -120,85 +120,146 @@ func (q *qLogFile) seekTS(
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// Define the search scope.
|
||||
|
||||
// Start of the search interval (position in the file).
|
||||
start := int64(0)
|
||||
// End of the search interval (position in the file).
|
||||
end := fileInfo.Size()
|
||||
// Probe is the approximate index of the line we'll try to check.
|
||||
probe := (end - start) / 2
|
||||
|
||||
var line string
|
||||
// Index of the probe line in the file.
|
||||
var lineIdx int64
|
||||
var lineEndIdx int64
|
||||
// Index of the last probe line.
|
||||
var lastProbeLineIdx int64
|
||||
lastProbeLineIdx = -1
|
||||
fileSize := fileInfo.Size()
|
||||
st := &tsSearchState{
|
||||
start: 0,
|
||||
end: fileSize,
|
||||
probe: fileSize / 2,
|
||||
lastProbeLineIdx: -1,
|
||||
fileSize: fileSize,
|
||||
}
|
||||
|
||||
// Count seek depth in order to detect mistakes. If depth is too large,
|
||||
// we should stop the search.
|
||||
for {
|
||||
// Get the line at the specified position.
|
||||
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
|
||||
var found bool
|
||||
found, err = q.seekTSStep(ctx, logger, timestamp, st)
|
||||
if err != nil {
|
||||
return 0, depth, err
|
||||
return 0, st.depth, err
|
||||
}
|
||||
|
||||
// Check if the line index if invalid.
|
||||
err = q.validateQLogLineIdx(lineIdx, lastProbeLineIdx, timestamp, fileInfo.Size())
|
||||
if err != nil {
|
||||
return 0, depth, err
|
||||
}
|
||||
|
||||
// Save the last found idx.
|
||||
lastProbeLineIdx = lineIdx
|
||||
|
||||
// Get the timestamp from the query log record.
|
||||
ts := readQLogTimestamp(ctx, logger, line)
|
||||
if ts == 0 {
|
||||
return 0, depth, fmt.Errorf(
|
||||
"looking up timestamp %d in %q: record %q has empty timestamp",
|
||||
timestamp,
|
||||
q.file.Name(),
|
||||
line,
|
||||
)
|
||||
}
|
||||
|
||||
if ts == timestamp {
|
||||
// Hurray, returning the result.
|
||||
if found {
|
||||
break
|
||||
}
|
||||
|
||||
// Narrow the scope and repeat the search.
|
||||
if ts > timestamp {
|
||||
// If the timestamp we're looking for is OLDER than what we found,
|
||||
// then the line is somewhere on the LEFT side from the current
|
||||
// probe position.
|
||||
end = lineIdx
|
||||
} else {
|
||||
// If the timestamp we're looking for is NEWER than what we found,
|
||||
// then the line is somewhere on the RIGHT side from the current
|
||||
// probe position.
|
||||
start = lineEndIdx
|
||||
}
|
||||
probe = start + (end-start)/2
|
||||
|
||||
depth++
|
||||
if depth >= 100 {
|
||||
return 0, depth, fmt.Errorf(
|
||||
"looking up timestamp %d in %q: depth %d too high: %w",
|
||||
timestamp,
|
||||
q.file.Name(),
|
||||
depth,
|
||||
errTSNotFound,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
q.position = lineIdx + int64(len(line))
|
||||
return q.position, depth, nil
|
||||
q.position = st.lineIdx + int64(len(st.line))
|
||||
|
||||
return q.position, st.depth, nil
|
||||
}
|
||||
|
||||
// tsSearchState contains the state of the binary-search loop.
|
||||
type tsSearchState struct {
|
||||
// line holds the contents of the matched line.
|
||||
line string
|
||||
|
||||
// end is the end of the current search interval.
|
||||
end int64
|
||||
|
||||
// fileSize is the total file size in bytes.
|
||||
fileSize int64
|
||||
|
||||
// lastProbeLineIdx is the byte offset of the last probed line start.
|
||||
lastProbeLineIdx int64
|
||||
|
||||
// lineIdx is the byte offset of the current line start.
|
||||
lineIdx int64
|
||||
|
||||
// probe is the byte offset to probe.
|
||||
probe int64
|
||||
|
||||
// start is the start of current search interval.
|
||||
start int64
|
||||
|
||||
// depth is the number of search iterations performed.
|
||||
depth int
|
||||
}
|
||||
|
||||
// maxSearchDepth is the maximum number of search iterations.
|
||||
const maxSearchDepth = 100
|
||||
|
||||
// seekTSStep performs one iteration of the binary search over the qlog file. l
|
||||
// and st must not be nil.
|
||||
func (q *qLogFile) seekTSStep(
|
||||
ctx context.Context,
|
||||
l *slog.Logger,
|
||||
timestamp int64,
|
||||
st *tsSearchState,
|
||||
) (found bool, err error) {
|
||||
line, lineIdx, lineEndIdx, err := q.readAndValidateProbe(st, timestamp)
|
||||
if err != nil {
|
||||
// Don't wrap the error, because it's informative enough as is.
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Get the timestamp from the query log record.
|
||||
ts := readQLogTimestamp(ctx, l, line)
|
||||
if ts == 0 {
|
||||
return false, fmt.Errorf(
|
||||
"looking up timestamp %d in %q: record %q has empty timestamp",
|
||||
timestamp,
|
||||
q.file.Name(),
|
||||
line,
|
||||
)
|
||||
}
|
||||
|
||||
if ts == timestamp {
|
||||
// Found the target record.
|
||||
st.line = line
|
||||
st.lineIdx = lineIdx
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Narrow the scope and repeat the search.
|
||||
if ts > timestamp {
|
||||
// If the timestamp we're looking for is OLDER than what we found, then
|
||||
// the line is somewhere on the LEFT side from the current probe
|
||||
// position.
|
||||
st.end = lineIdx
|
||||
} else {
|
||||
// If the timestamp we're looking for is NEWER than what we found, then
|
||||
// the line is somewhere on the RIGHT side from the current probe
|
||||
// position.
|
||||
st.start = lineEndIdx
|
||||
}
|
||||
st.probe = st.start + (st.end-st.start)/2
|
||||
|
||||
st.depth++
|
||||
if st.depth >= maxSearchDepth {
|
||||
return false, fmt.Errorf(
|
||||
"looking up timestamp %d in %q: depth %d too high: %w",
|
||||
timestamp,
|
||||
q.file.Name(),
|
||||
st.depth,
|
||||
errTSNotFound,
|
||||
)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// readAndValidateProbe reads the probe line and validates the line index.
|
||||
func (q *qLogFile) readAndValidateProbe(
|
||||
st *tsSearchState,
|
||||
timestamp int64,
|
||||
) (line string, lineIdx, lineEndIdx int64, err error) {
|
||||
// Get the line at the specified position.
|
||||
line, lineIdx, lineEndIdx, err = q.readProbeLine(st.probe)
|
||||
if err != nil {
|
||||
return "", 0, 0, fmt.Errorf("reading probe line: %w", err)
|
||||
}
|
||||
|
||||
// Check if the line index is invalid.
|
||||
err = q.validateQLogLineIdx(lineIdx, st.lastProbeLineIdx, timestamp, st.fileSize)
|
||||
if err != nil {
|
||||
return "", 0, 0, fmt.Errorf("validating line index: %w", err)
|
||||
}
|
||||
|
||||
// Save the last probed index.
|
||||
st.lastProbeLineIdx = lineIdx
|
||||
|
||||
return line, lineIdx, lineEndIdx, nil
|
||||
}
|
||||
|
||||
// SeekStart changes the current position to the end of the file. Please note,
|
||||
|
||||
@@ -121,23 +121,33 @@ func TestQLogFile_ReadNext(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, expPos, pos)
|
||||
|
||||
var read int
|
||||
var line string
|
||||
for err == nil {
|
||||
line, err = q.ReadNext()
|
||||
if err == nil {
|
||||
assert.NotEmpty(t, line)
|
||||
read++
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, io.EOF, err)
|
||||
|
||||
read := readAllLines(t, q)
|
||||
assert.Equal(t, tc.linesNum, read)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// readAllLines is a helper function that reads entries until EOF and returns
|
||||
// the number of lines read.
|
||||
func readAllLines(tb testing.TB, q *qLogFile) (n int) {
|
||||
tb.Helper()
|
||||
|
||||
for {
|
||||
line, err := q.ReadNext()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
require.NoError(tb, err)
|
||||
|
||||
assert.NotEmpty(tb, line)
|
||||
|
||||
n++
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func TestQLogFile_SeekTS_good(t *testing.T) {
|
||||
linesCases := []struct {
|
||||
name string
|
||||
|
||||
@@ -130,24 +130,24 @@ func (r *qLogReader) ReadNext() (string, error) {
|
||||
for r.currentFile >= 0 {
|
||||
q := r.qFiles[r.currentFile]
|
||||
line, err := q.ReadNext()
|
||||
if err != nil {
|
||||
// Shift to the older file.
|
||||
r.currentFile--
|
||||
if r.currentFile < 0 {
|
||||
break
|
||||
}
|
||||
|
||||
q = r.qFiles[r.currentFile]
|
||||
|
||||
// Set its position to the start right away.
|
||||
_, err = q.SeekStart()
|
||||
// This is unexpected, return an error right away.
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
if err == nil {
|
||||
return line, nil
|
||||
}
|
||||
|
||||
// Shift to the older file.
|
||||
r.currentFile--
|
||||
if r.currentFile < 0 {
|
||||
break
|
||||
}
|
||||
|
||||
q = r.qFiles[r.currentFile]
|
||||
|
||||
// Set its position to the start right away.
|
||||
_, err = q.SeekStart()
|
||||
// This is unexpected, return an error right away.
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing to read anymore.
|
||||
|
||||
@@ -217,13 +217,11 @@ func (l *queryLog) readEntries(
|
||||
) (entries []*logEntry, oldestNano int64, total int) {
|
||||
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
|
||||
ent, ts, rErr := l.readNextEntry(ctx, r, params, cache)
|
||||
if rErr != nil {
|
||||
if rErr == io.EOF {
|
||||
oldestNano = 0
|
||||
|
||||
break
|
||||
}
|
||||
if rErr == io.EOF {
|
||||
oldestNano = 0
|
||||
|
||||
break
|
||||
} else if rErr != nil {
|
||||
l.logger.ErrorContext(ctx, "reading next entry", slogutil.KeyError, rErr)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user