Skip to content
This repository has been archived by the owner on Mar 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #39 from RaduBerinde/logevent-limit
Browse files Browse the repository at this point in the history
Add option for limiting the number of logs in a Span
  • Loading branch information
bensigelman authored Sep 29, 2016
2 parents 017835e + 723bb40 commit 1b32af2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 11 deletions.
70 changes: 64 additions & 6 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type spanImpl struct {
event func(SpanEvent)
sync.Mutex // protects the fields below
raw RawSpan
// The number of logs dropped because of MaxLogsPerSpan.
numDroppedLogs int
}

var spanPool = &sync.Pool{New: func() interface{} {
Expand Down Expand Up @@ -98,6 +100,21 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
s.LogFields(fields...)
}

func (s *spanImpl) appendLog(lr opentracing.LogRecord) {
maxLogs := s.tracer.options.MaxLogsPerSpan
if maxLogs == 0 || len(s.raw.Logs) < maxLogs {
s.raw.Logs = append(s.raw.Logs, lr)
return
}

// We have too many logs. We don't touch the first numOld logs; we treat the
// rest as a circular buffer and overwrite the oldest log among those.
numOld := (maxLogs - 1) / 2
numNew := maxLogs - numOld
s.raw.Logs[numOld+s.numDroppedLogs%numNew] = lr
s.numDroppedLogs++
}

func (s *spanImpl) LogFields(fields ...log.Field) {
lr := opentracing.LogRecord{
Fields: fields,
Expand All @@ -111,7 +128,7 @@ func (s *spanImpl) LogFields(fields ...log.Field) {
if lr.Timestamp.IsZero() {
lr.Timestamp = time.Now()
}
s.raw.Logs = append(s.raw.Logs, lr)
s.appendLog(lr)
}

func (s *spanImpl) LogEvent(event string) {
Expand Down Expand Up @@ -139,13 +156,30 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

func (s *spanImpl) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}

// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
// This algorithm is described in:
// http://www.cplusplus.com/reference/algorithm/rotate
for first, middle, next := 0, pos, pos; first != middle; {
buf[first], buf[next] = buf[next], buf[first]
first++
next++
if next == len(buf) {
next = middle
} else if first == middle {
middle = next
}
}
}

func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
finishTime := opts.FinishTime
if finishTime.IsZero() {
Expand All @@ -155,18 +189,42 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {

s.Lock()
defer s.Unlock()
if opts.LogRecords != nil {
s.raw.Logs = append(s.raw.Logs, opts.LogRecords...)

for _, lr := range opts.LogRecords {
s.appendLog(lr)
}
for _, ld := range opts.BulkLogData {
s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

if s.numDroppedLogs > 0 {
// We dropped some log events, which means that we used part of Logs as a
// circular buffer (see appendLog). De-circularize it.
numOld := (len(s.raw.Logs) - 1) / 2
numNew := len(s.raw.Logs) - numOld
rotateLogBuffer(s.raw.Logs[numOld:], s.numDroppedLogs%numNew)

// Replace the log in the middle (the oldest "new" log) with information
// about the dropped logs. This means that we are effectively dropping one
// more "new" log.
numDropped := s.numDroppedLogs + 1
s.raw.Logs[numOld] = opentracing.LogRecord{
// Keep the timestamp of the last dropped event.
Timestamp: s.raw.Logs[numOld].Timestamp,
Fields: []log.Field{
log.String("event", "dropped Span logs"),
log.Int("dropped_log_count", numDropped),
log.String("component", "basictracer"),
},
}
}

s.raw.Duration = duration

s.onFinish(s.raw)
s.tracer.options.Recorder.RecordSpan(s.raw)

// Last chance to get options before the span is possbily reset.
// Last chance to get options before the span is possibly reset.
poolEnabled := s.tracer.options.EnableSpanPool
if s.tracer.options.DebugAssertUseAfterFinish {
// This makes it much more likely to catch a panic on any subsequent
Expand Down
56 changes: 56 additions & 0 deletions span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package basictracer

import (
"reflect"
"strconv"
"testing"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -166,3 +167,58 @@ func TestSpan_DropAllLogs(t *testing.T) {
// Only logs are dropped
assert.Equal(t, 0, len(spans[0].Logs))
}

func TestSpan_MaxLogSperSpan(t *testing.T) {
for _, limit := range []int{1, 2, 3, 5, 10, 15, 20} {
for _, numLogs := range []int{1, 2, 3, 5, 10, 15, 20, 30, 40, 50} {
recorder := NewInMemoryRecorder()
// Tracer that only retains the last <limit> logs.
tracer := NewWithOptions(Options{
Recorder: recorder,
ShouldSample: func(traceID uint64) bool { return true }, // always sample
MaxLogsPerSpan: limit,
})

span := tracer.StartSpan("x")
for i := 0; i < numLogs; i++ {
span.LogKV("eventIdx", i)
}
span.Finish()

spans := recorder.GetSpans()
assert.Equal(t, 1, len(spans))
assert.Equal(t, "x", spans[0].Operation)

logs := spans[0].Logs
var firstLogs, lastLogs []opentracing.LogRecord
if numLogs <= limit {
assert.Equal(t, numLogs, len(logs))
firstLogs = logs
} else {
assert.Equal(t, limit, len(logs))
if len(logs) > 0 {
numOld := (len(logs) - 1) / 2
firstLogs = logs[:numOld]
lastLogs = logs[numOld+1:]

fv := NewLogFieldValidator(t, logs[numOld].Fields)
fv = fv.ExpectNextFieldEquals("event", reflect.String, "dropped Span logs")
fv = fv.ExpectNextFieldEquals(
"dropped_log_count", reflect.Int, strconv.Itoa(numLogs-limit+1),
)
fv.ExpectNextFieldEquals("component", reflect.String, "basictracer")
}
}

for i, lr := range firstLogs {
fv := NewLogFieldValidator(t, lr.Fields)
fv.ExpectNextFieldEquals("eventIdx", reflect.Int, strconv.Itoa(i))
}

for i, lr := range lastLogs {
fv := NewLogFieldValidator(t, lr.Fields)
fv.ExpectNextFieldEquals("eventIdx", reflect.Int, strconv.Itoa(numLogs-len(lastLogs)+i))
}
}
}
}
12 changes: 8 additions & 4 deletions testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package basictracer
import (
"fmt"
"reflect"
"runtime"
"testing"

"github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -41,7 +42,8 @@ func NewLogFieldValidator(t *testing.T, fields []log.Field) *LogFieldValidator {
// []Field slices.
func (fv *LogFieldValidator) ExpectNextFieldEquals(key string, kind reflect.Kind, valAsString string) *LogFieldValidator {
if len(fv.fields) < fv.fieldIdx {
fv.t.Errorf("Expecting more than the %v Fields we have", len(fv.fields))
_, file, line, _ := runtime.Caller(1)
fv.t.Errorf("%s:%d Expecting more than the %v Fields we have", file, line, len(fv.fields))
}
fv.nextKey = key
fv.nextKind = kind
Expand Down Expand Up @@ -107,15 +109,17 @@ func (fv *LogFieldValidator) EmitLazyLogger(value log.LazyLogger) {
}

func (fv *LogFieldValidator) validateNextField(key string, actualKind reflect.Kind, value interface{}) {
// Reference the ExpectNextField caller in error messages.
_, file, line, _ := runtime.Caller(4)
if fv.nextKey != key {
fv.t.Errorf("Bad key: expected %q, found %q", fv.nextKey, key)
fv.t.Errorf("%s:%d Bad key: expected %q, found %q", file, line, fv.nextKey, key)
}
if fv.nextKind != actualKind {
fv.t.Errorf("Bad reflect.Kind: expected %v, found %v", fv.nextKind, actualKind)
fv.t.Errorf("%s:%d Bad reflect.Kind: expected %v, found %v", file, line, fv.nextKind, actualKind)
return
}
if fv.nextValAsString != fmt.Sprint(value) {
fv.t.Errorf("Bad value: expected %q, found %q", fv.nextValAsString, fmt.Sprint(value))
fv.t.Errorf("%s:%d Bad value: expected %q, found %q", file, line, fv.nextValAsString, fmt.Sprint(value))
}
// All good.
}
13 changes: 12 additions & 1 deletion tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ type Options struct {
// DropAllLogs turns log events on all Spans into no-ops.
// If NewSpanEventListener is set, the callbacks will still fire.
DropAllLogs bool
// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
//
// If NewSpanEventListener is set, the callbacks will still fire for all log
// events. This value is ignored if DropAllLogs is true.
MaxLogsPerSpan int
// DebugAssertSingleGoroutine internally records the ID of the goroutine
// creating each Span and verifies that no operation is carried out on
// it on a different goroutine.
Expand Down Expand Up @@ -87,7 +97,8 @@ type Options struct {
// returned object with a Tracer.
func DefaultOptions() Options {
return Options{
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
MaxLogsPerSpan: 100,
}
}

Expand Down

0 comments on commit 1b32af2

Please sign in to comment.