// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" "runtime" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/internal/env" "go.opentelemetry.io/otel/trace" ) // Defaults for BatchSpanProcessorOptions. const ( DefaultMaxQueueSize = 2048 DefaultScheduleDelay = 5000 DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 ) type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) type BatchSpanProcessorOptions struct { // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. // The default value of MaxQueueSize is 2048. MaxQueueSize int // BatchTimeout is the maximum duration for constructing a batch. Processor // forcefully sends available spans when timeout is reached. // The default value of BatchTimeout is 5000 msec. BatchTimeout time.Duration // ExportTimeout specifies the maximum duration for exporting spans. If the timeout // is reached, the export will be cancelled. // The default value of ExportTimeout is 30000 msec. ExportTimeout time.Duration // MaxExportBatchSize is the maximum number of spans to process in a single batch. // If there are more than one batch worth of spans then it processes multiple batches // of spans one batch after the other without any delay. // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full // AND if BlockOnQueueFull is set to true. // Blocking option should be used carefully as it can severely affect the performance of an // application. BlockOnQueueFull bool } // batchSpanProcessor is a SpanProcessor that batches asynchronously-received // spans and sends them to a trace.Exporter when complete. type batchSpanProcessor struct { e SpanExporter o BatchSpanProcessorOptions queue chan ReadOnlySpan dropped uint32 batch []ReadOnlySpan batchMutex sync.Mutex timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once stopCh chan struct{} } var _ SpanProcessor = (*batchSpanProcessor)(nil) // NewBatchSpanProcessor creates a new SpanProcessor that will send completed // span batches to the exporter with the supplied options. // // If the exporter is nil, the span processor will preform no action. func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize) maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize) if maxExportBatchSize > maxQueueSize { if DefaultMaxExportBatchSize > maxQueueSize { maxExportBatchSize = maxQueueSize } else { maxExportBatchSize = DefaultMaxExportBatchSize } } o := BatchSpanProcessorOptions{ BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond, ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond, MaxQueueSize: maxQueueSize, MaxExportBatchSize: maxExportBatchSize, } for _, opt := range options { opt(&o) } bsp := &batchSpanProcessor{ e: exporter, o: o, batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize), timer: time.NewTimer(o.BatchTimeout), queue: make(chan ReadOnlySpan, o.MaxQueueSize), stopCh: make(chan struct{}), } bsp.stopWait.Add(1) go func() { defer bsp.stopWait.Done() bsp.processQueue() bsp.drainQueue() }() return bsp } // OnStart method does nothing. func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} // OnEnd method enqueues a ReadOnlySpan for later processing. func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { // Do not enqueue spans if we are just going to drop them. if bsp.e == nil { return } bsp.enqueue(s) } // Shutdown flushes the queue and waits until all spans are processed. // It only executes once. Subsequent call does nothing. func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { var err error bsp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { close(bsp.stopCh) bsp.stopWait.Wait() if bsp.e != nil { if err := bsp.e.Shutdown(ctx); err != nil { otel.Handle(err) } } close(wait) }() // Wait until the wait group is done or the context is cancelled select { case <-wait: case <-ctx.Done(): err = ctx.Err() } }) return err } type forceFlushSpan struct { ReadOnlySpan flushed chan struct{} } func (f forceFlushSpan) SpanContext() trace.SpanContext { return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) } // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { flushCh := make(chan struct{}) if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) { select { case <-flushCh: // Processed any items in queue prior to ForceFlush being called case <-ctx.Done(): return ctx.Err() } } wait := make(chan error) go func() { wait <- bsp.exportSpans(ctx) close(wait) }() // Wait until the export is finished or the context is cancelled/timed out select { case err = <-wait: case <-ctx.Done(): err = ctx.Err() } } return err } func WithMaxQueueSize(size int) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.MaxQueueSize = size } } func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.MaxExportBatchSize = size } } func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.BatchTimeout = delay } } func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.ExportTimeout = timeout } } func WithBlocking() BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.BlockOnQueueFull = true } } // exportSpans is a subroutine of processing and draining the queue. func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { bsp.timer.Reset(bsp.o.BatchTimeout) bsp.batchMutex.Lock() defer bsp.batchMutex.Unlock() if bsp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) defer cancel() } if l := len(bsp.batch); l > 0 { global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. // // It is up to the exporter to implement any type of retry logic if a batch is failing // to be exported, since it is specific to the protocol and backend being sent to. bsp.batch = bsp.batch[:0] if err != nil { return err } } return nil } // processQueue removes spans from the `queue` channel until processor // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. func (bsp *batchSpanProcessor) processQueue() { defer bsp.timer.Stop() ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { select { case <-bsp.stopCh: return case <-bsp.timer.C: if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } case sd := <-bsp.queue: if ffs, ok := sd.(forceFlushSpan); ok { close(ffs.flushed) continue } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize bsp.batchMutex.Unlock() if shouldExport { if !bsp.timer.Stop() { <-bsp.timer.C } if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } } } } } // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *batchSpanProcessor) drainQueue() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { select { case sd := <-bsp.queue: if sd == nil { if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } return } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize bsp.batchMutex.Unlock() if shouldExport { if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } } default: close(bsp.queue) } } } func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) } func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool { if !sd.SpanContext().IsSampled() { return false } // This ensures the bsp.queue<- below does not panic as the // processor shuts down. defer func() { x := recover() switch err := x.(type) { case nil: return case runtime.Error: if err.Error() == "send on closed channel" { return } } panic(x) }() select { case <-bsp.stopCh: return false default: } if block { select { case bsp.queue <- sd: return true case <-ctx.Done(): return false } } select { case bsp.queue <- sd: return true default: atomic.AddUint32(&bsp.dropped, 1) } return false } // MarshalLog is the marshaling function used by the logging system to represent this exporter. func (bsp *batchSpanProcessor) MarshalLog() interface{} { return struct { Type string SpanExporter SpanExporter Config BatchSpanProcessorOptions }{ Type: "BatchSpanProcessor", SpanExporter: bsp.e, Config: bsp.o, } }