593 lines
15 KiB
Go
593 lines
15 KiB
Go
|
package sentry
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"crypto/tls"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/getsentry/sentry-go/internal/ratelimit"
|
||
|
)
|
||
|
|
||
|
const defaultBufferSize = 30
|
||
|
const defaultTimeout = time.Second * 30
|
||
|
|
||
|
// maxDrainResponseBytes is the maximum number of bytes that transport
|
||
|
// implementations will read from response bodies when draining them.
|
||
|
//
|
||
|
// Sentry's ingestion API responses are typically short and the SDK doesn't need
|
||
|
// the contents of the response body. However, the net/http HTTP client requires
|
||
|
// response bodies to be fully drained (and closed) for TCP keep-alive to work.
|
||
|
//
|
||
|
// maxDrainResponseBytes strikes a balance between reading too much data (if the
|
||
|
// server is misbehaving) and reusing TCP connections.
|
||
|
const maxDrainResponseBytes = 16 << 10
|
||
|
|
||
|
// Transport is used by the Client to deliver events to remote server.
|
||
|
type Transport interface {
|
||
|
Flush(timeout time.Duration) bool
|
||
|
Configure(options ClientOptions)
|
||
|
SendEvent(event *Event)
|
||
|
}
|
||
|
|
||
|
func getProxyConfig(options ClientOptions) func(*http.Request) (*url.URL, error) {
|
||
|
if options.HTTPSProxy != "" {
|
||
|
return func(*http.Request) (*url.URL, error) {
|
||
|
return url.Parse(options.HTTPSProxy)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if options.HTTPProxy != "" {
|
||
|
return func(*http.Request) (*url.URL, error) {
|
||
|
return url.Parse(options.HTTPProxy)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return http.ProxyFromEnvironment
|
||
|
}
|
||
|
|
||
|
func getTLSConfig(options ClientOptions) *tls.Config {
|
||
|
if options.CaCerts != nil {
|
||
|
// #nosec G402 -- We should be using `MinVersion: tls.VersionTLS12`,
|
||
|
// but we don't want to break peoples code without the major bump.
|
||
|
return &tls.Config{
|
||
|
RootCAs: options.CaCerts,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func getRequestBodyFromEvent(event *Event) []byte {
|
||
|
body, err := json.Marshal(event)
|
||
|
if err == nil {
|
||
|
return body
|
||
|
}
|
||
|
|
||
|
msg := fmt.Sprintf("Could not encode original event as JSON. "+
|
||
|
"Succeeded by removing Breadcrumbs, Contexts and Extra. "+
|
||
|
"Please verify the data you attach to the scope. "+
|
||
|
"Error: %s", err)
|
||
|
// Try to serialize the event, with all the contextual data that allows for interface{} stripped.
|
||
|
event.Breadcrumbs = nil
|
||
|
event.Contexts = nil
|
||
|
event.Extra = map[string]interface{}{
|
||
|
"info": msg,
|
||
|
}
|
||
|
body, err = json.Marshal(event)
|
||
|
if err == nil {
|
||
|
Logger.Println(msg)
|
||
|
return body
|
||
|
}
|
||
|
|
||
|
// This should _only_ happen when Event.Exception[0].Stacktrace.Frames[0].Vars is unserializable
|
||
|
// Which won't ever happen, as we don't use it now (although it's the part of public interface accepted by Sentry)
|
||
|
// Juuust in case something, somehow goes utterly wrong.
|
||
|
Logger.Println("Event couldn't be marshaled, even with stripped contextual data. Skipping delivery. " +
|
||
|
"Please notify the SDK owners with possibly broken payload.")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func transactionEnvelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMessage) (*bytes.Buffer, error) {
|
||
|
var b bytes.Buffer
|
||
|
enc := json.NewEncoder(&b)
|
||
|
|
||
|
// Construct the trace envelope header
|
||
|
var trace = map[string]string{}
|
||
|
if dsc := event.sdkMetaData.dsc; dsc.HasEntries() {
|
||
|
for k, v := range dsc.Entries {
|
||
|
trace[k] = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Envelope header
|
||
|
err := enc.Encode(struct {
|
||
|
EventID EventID `json:"event_id"`
|
||
|
SentAt time.Time `json:"sent_at"`
|
||
|
Dsn string `json:"dsn"`
|
||
|
Sdk map[string]string `json:"sdk"`
|
||
|
Trace map[string]string `json:"trace,omitempty"`
|
||
|
}{
|
||
|
EventID: event.EventID,
|
||
|
SentAt: sentAt,
|
||
|
Trace: trace,
|
||
|
Dsn: dsn.String(),
|
||
|
Sdk: map[string]string{
|
||
|
"name": event.Sdk.Name,
|
||
|
"version": event.Sdk.Version,
|
||
|
},
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// Item header
|
||
|
err = enc.Encode(struct {
|
||
|
Type string `json:"type"`
|
||
|
Length int `json:"length"`
|
||
|
}{
|
||
|
Type: transactionType,
|
||
|
Length: len(body),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
// payload
|
||
|
err = enc.Encode(body)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &b, nil
|
||
|
}
|
||
|
|
||
|
func getRequestFromEvent(event *Event, dsn *Dsn) (r *http.Request, err error) {
|
||
|
defer func() {
|
||
|
if r != nil {
|
||
|
r.Header.Set("User-Agent", userAgent)
|
||
|
}
|
||
|
}()
|
||
|
body := getRequestBodyFromEvent(event)
|
||
|
if body == nil {
|
||
|
return nil, errors.New("event could not be marshaled")
|
||
|
}
|
||
|
if event.Type == transactionType {
|
||
|
b, err := transactionEnvelopeFromBody(event, dsn, time.Now(), body)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return http.NewRequest(
|
||
|
http.MethodPost,
|
||
|
dsn.EnvelopeAPIURL().String(),
|
||
|
b,
|
||
|
)
|
||
|
}
|
||
|
return http.NewRequest(
|
||
|
http.MethodPost,
|
||
|
dsn.StoreAPIURL().String(),
|
||
|
bytes.NewReader(body),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
func categoryFor(eventType string) ratelimit.Category {
|
||
|
switch eventType {
|
||
|
case "":
|
||
|
return ratelimit.CategoryError
|
||
|
case transactionType:
|
||
|
return ratelimit.CategoryTransaction
|
||
|
default:
|
||
|
return ratelimit.Category(eventType)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ================================
|
||
|
// HTTPTransport
|
||
|
// ================================
|
||
|
|
||
|
// A batch groups items that are processed sequentially.
|
||
|
type batch struct {
|
||
|
items chan batchItem
|
||
|
started chan struct{} // closed to signal items started to be worked on
|
||
|
done chan struct{} // closed to signal completion of all items
|
||
|
}
|
||
|
|
||
|
type batchItem struct {
|
||
|
request *http.Request
|
||
|
category ratelimit.Category
|
||
|
}
|
||
|
|
||
|
// HTTPTransport is the default, non-blocking, implementation of Transport.
|
||
|
//
|
||
|
// Clients using this transport will enqueue requests in a buffer and return to
|
||
|
// the caller before any network communication has happened. Requests are sent
|
||
|
// to Sentry sequentially from a background goroutine.
|
||
|
type HTTPTransport struct {
|
||
|
dsn *Dsn
|
||
|
client *http.Client
|
||
|
transport http.RoundTripper
|
||
|
|
||
|
// buffer is a channel of batches. Calling Flush terminates work on the
|
||
|
// current in-flight items and starts a new batch for subsequent events.
|
||
|
buffer chan batch
|
||
|
|
||
|
start sync.Once
|
||
|
|
||
|
// Size of the transport buffer. Defaults to 30.
|
||
|
BufferSize int
|
||
|
// HTTP Client request timeout. Defaults to 30 seconds.
|
||
|
Timeout time.Duration
|
||
|
|
||
|
mu sync.RWMutex
|
||
|
limits ratelimit.Map
|
||
|
}
|
||
|
|
||
|
// NewHTTPTransport returns a new pre-configured instance of HTTPTransport.
|
||
|
func NewHTTPTransport() *HTTPTransport {
|
||
|
transport := HTTPTransport{
|
||
|
BufferSize: defaultBufferSize,
|
||
|
Timeout: defaultTimeout,
|
||
|
limits: make(ratelimit.Map),
|
||
|
}
|
||
|
return &transport
|
||
|
}
|
||
|
|
||
|
// Configure is called by the Client itself, providing it it's own ClientOptions.
|
||
|
func (t *HTTPTransport) Configure(options ClientOptions) {
|
||
|
dsn, err := NewDsn(options.Dsn)
|
||
|
if err != nil {
|
||
|
Logger.Printf("%v\n", err)
|
||
|
return
|
||
|
}
|
||
|
t.dsn = dsn
|
||
|
|
||
|
// A buffered channel with capacity 1 works like a mutex, ensuring only one
|
||
|
// goroutine can access the current batch at a given time. Access is
|
||
|
// synchronized by reading from and writing to the channel.
|
||
|
t.buffer = make(chan batch, 1)
|
||
|
t.buffer <- batch{
|
||
|
items: make(chan batchItem, t.BufferSize),
|
||
|
started: make(chan struct{}),
|
||
|
done: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
if options.HTTPTransport != nil {
|
||
|
t.transport = options.HTTPTransport
|
||
|
} else {
|
||
|
t.transport = &http.Transport{
|
||
|
Proxy: getProxyConfig(options),
|
||
|
TLSClientConfig: getTLSConfig(options),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if options.HTTPClient != nil {
|
||
|
t.client = options.HTTPClient
|
||
|
} else {
|
||
|
t.client = &http.Client{
|
||
|
Transport: t.transport,
|
||
|
Timeout: t.Timeout,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
t.start.Do(func() {
|
||
|
go t.worker()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// SendEvent assembles a new packet out of Event and sends it to remote server.
|
||
|
func (t *HTTPTransport) SendEvent(event *Event) {
|
||
|
if t.dsn == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
category := categoryFor(event.Type)
|
||
|
|
||
|
if t.disabled(category) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
request, err := getRequestFromEvent(event, t.dsn)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for headerKey, headerValue := range t.dsn.RequestHeaders() {
|
||
|
request.Header.Set(headerKey, headerValue)
|
||
|
}
|
||
|
|
||
|
// <-t.buffer is equivalent to acquiring a lock to access the current batch.
|
||
|
// A few lines below, t.buffer <- b releases the lock.
|
||
|
//
|
||
|
// The lock must be held during the select block below to guarantee that
|
||
|
// b.items is not closed while trying to send to it. Remember that sending
|
||
|
// on a closed channel panics.
|
||
|
//
|
||
|
// Note that the select block takes a bounded amount of CPU time because of
|
||
|
// the default case that is executed if sending on b.items would block. That
|
||
|
// is, the event is dropped if it cannot be sent immediately to the b.items
|
||
|
// channel (used as a queue).
|
||
|
b := <-t.buffer
|
||
|
|
||
|
select {
|
||
|
case b.items <- batchItem{
|
||
|
request: request,
|
||
|
category: category,
|
||
|
}:
|
||
|
var eventType string
|
||
|
if event.Type == transactionType {
|
||
|
eventType = "transaction"
|
||
|
} else {
|
||
|
eventType = fmt.Sprintf("%s event", event.Level)
|
||
|
}
|
||
|
Logger.Printf(
|
||
|
"Sending %s [%s] to %s project: %s",
|
||
|
eventType,
|
||
|
event.EventID,
|
||
|
t.dsn.host,
|
||
|
t.dsn.projectID,
|
||
|
)
|
||
|
default:
|
||
|
Logger.Println("Event dropped due to transport buffer being full.")
|
||
|
}
|
||
|
|
||
|
t.buffer <- b
|
||
|
}
|
||
|
|
||
|
// Flush waits until any buffered events are sent to the Sentry server, blocking
|
||
|
// for at most the given timeout. It returns false if the timeout was reached.
|
||
|
// In that case, some events may not have been sent.
|
||
|
//
|
||
|
// Flush should be called before terminating the program to avoid
|
||
|
// unintentionally dropping events.
|
||
|
//
|
||
|
// Do not call Flush indiscriminately after every call to SendEvent. Instead, to
|
||
|
// have the SDK send events over the network synchronously, configure it to use
|
||
|
// the HTTPSyncTransport in the call to Init.
|
||
|
func (t *HTTPTransport) Flush(timeout time.Duration) bool {
|
||
|
toolate := time.After(timeout)
|
||
|
|
||
|
// Wait until processing the current batch has started or the timeout.
|
||
|
//
|
||
|
// We must wait until the worker has seen the current batch, because it is
|
||
|
// the only way b.done will be closed. If we do not wait, there is a
|
||
|
// possible execution flow in which b.done is never closed, and the only way
|
||
|
// out of Flush would be waiting for the timeout, which is undesired.
|
||
|
var b batch
|
||
|
for {
|
||
|
select {
|
||
|
case b = <-t.buffer:
|
||
|
select {
|
||
|
case <-b.started:
|
||
|
goto started
|
||
|
default:
|
||
|
t.buffer <- b
|
||
|
}
|
||
|
case <-toolate:
|
||
|
goto fail
|
||
|
}
|
||
|
}
|
||
|
|
||
|
started:
|
||
|
// Signal that there won't be any more items in this batch, so that the
|
||
|
// worker inner loop can end.
|
||
|
close(b.items)
|
||
|
// Start a new batch for subsequent events.
|
||
|
t.buffer <- batch{
|
||
|
items: make(chan batchItem, t.BufferSize),
|
||
|
started: make(chan struct{}),
|
||
|
done: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
// Wait until the current batch is done or the timeout.
|
||
|
select {
|
||
|
case <-b.done:
|
||
|
Logger.Println("Buffer flushed successfully.")
|
||
|
return true
|
||
|
case <-toolate:
|
||
|
goto fail
|
||
|
}
|
||
|
|
||
|
fail:
|
||
|
Logger.Println("Buffer flushing reached the timeout.")
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (t *HTTPTransport) worker() {
|
||
|
for b := range t.buffer {
|
||
|
// Signal that processing of the current batch has started.
|
||
|
close(b.started)
|
||
|
|
||
|
// Return the batch to the buffer so that other goroutines can use it.
|
||
|
// Equivalent to releasing a lock.
|
||
|
t.buffer <- b
|
||
|
|
||
|
// Process all batch items.
|
||
|
for item := range b.items {
|
||
|
if t.disabled(item.category) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
response, err := t.client.Do(item.request)
|
||
|
if err != nil {
|
||
|
Logger.Printf("There was an issue with sending an event: %v", err)
|
||
|
continue
|
||
|
}
|
||
|
t.mu.Lock()
|
||
|
t.limits.Merge(ratelimit.FromResponse(response))
|
||
|
t.mu.Unlock()
|
||
|
// Drain body up to a limit and close it, allowing the
|
||
|
// transport to reuse TCP connections.
|
||
|
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
|
||
|
response.Body.Close()
|
||
|
}
|
||
|
|
||
|
// Signal that processing of the batch is done.
|
||
|
close(b.done)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *HTTPTransport) disabled(c ratelimit.Category) bool {
|
||
|
t.mu.RLock()
|
||
|
defer t.mu.RUnlock()
|
||
|
disabled := t.limits.IsRateLimited(c)
|
||
|
if disabled {
|
||
|
Logger.Printf("Too many requests for %q, backing off till: %v", c, t.limits.Deadline(c))
|
||
|
}
|
||
|
return disabled
|
||
|
}
|
||
|
|
||
|
// ================================
|
||
|
// HTTPSyncTransport
|
||
|
// ================================
|
||
|
|
||
|
// HTTPSyncTransport is a blocking implementation of Transport.
|
||
|
//
|
||
|
// Clients using this transport will send requests to Sentry sequentially and
|
||
|
// block until a response is returned.
|
||
|
//
|
||
|
// The blocking behavior is useful in a limited set of use cases. For example,
|
||
|
// use it when deploying code to a Function as a Service ("Serverless")
|
||
|
// platform, where any work happening in a background goroutine is not
|
||
|
// guaranteed to execute.
|
||
|
//
|
||
|
// For most cases, prefer HTTPTransport.
|
||
|
type HTTPSyncTransport struct {
|
||
|
dsn *Dsn
|
||
|
client *http.Client
|
||
|
transport http.RoundTripper
|
||
|
|
||
|
mu sync.Mutex
|
||
|
limits ratelimit.Map
|
||
|
|
||
|
// HTTP Client request timeout. Defaults to 30 seconds.
|
||
|
Timeout time.Duration
|
||
|
}
|
||
|
|
||
|
// NewHTTPSyncTransport returns a new pre-configured instance of HTTPSyncTransport.
|
||
|
func NewHTTPSyncTransport() *HTTPSyncTransport {
|
||
|
transport := HTTPSyncTransport{
|
||
|
Timeout: defaultTimeout,
|
||
|
limits: make(ratelimit.Map),
|
||
|
}
|
||
|
|
||
|
return &transport
|
||
|
}
|
||
|
|
||
|
// Configure is called by the Client itself, providing it it's own ClientOptions.
|
||
|
func (t *HTTPSyncTransport) Configure(options ClientOptions) {
|
||
|
dsn, err := NewDsn(options.Dsn)
|
||
|
if err != nil {
|
||
|
Logger.Printf("%v\n", err)
|
||
|
return
|
||
|
}
|
||
|
t.dsn = dsn
|
||
|
|
||
|
if options.HTTPTransport != nil {
|
||
|
t.transport = options.HTTPTransport
|
||
|
} else {
|
||
|
t.transport = &http.Transport{
|
||
|
Proxy: getProxyConfig(options),
|
||
|
TLSClientConfig: getTLSConfig(options),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if options.HTTPClient != nil {
|
||
|
t.client = options.HTTPClient
|
||
|
} else {
|
||
|
t.client = &http.Client{
|
||
|
Transport: t.transport,
|
||
|
Timeout: t.Timeout,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SendEvent assembles a new packet out of Event and sends it to remote server.
|
||
|
func (t *HTTPSyncTransport) SendEvent(event *Event) {
|
||
|
if t.dsn == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if t.disabled(categoryFor(event.Type)) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
request, err := getRequestFromEvent(event, t.dsn)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for headerKey, headerValue := range t.dsn.RequestHeaders() {
|
||
|
request.Header.Set(headerKey, headerValue)
|
||
|
}
|
||
|
|
||
|
var eventType string
|
||
|
if event.Type == transactionType {
|
||
|
eventType = "transaction"
|
||
|
} else {
|
||
|
eventType = fmt.Sprintf("%s event", event.Level)
|
||
|
}
|
||
|
Logger.Printf(
|
||
|
"Sending %s [%s] to %s project: %s",
|
||
|
eventType,
|
||
|
event.EventID,
|
||
|
t.dsn.host,
|
||
|
t.dsn.projectID,
|
||
|
)
|
||
|
|
||
|
response, err := t.client.Do(request)
|
||
|
if err != nil {
|
||
|
Logger.Printf("There was an issue with sending an event: %v", err)
|
||
|
return
|
||
|
}
|
||
|
t.mu.Lock()
|
||
|
t.limits.Merge(ratelimit.FromResponse(response))
|
||
|
t.mu.Unlock()
|
||
|
|
||
|
// Drain body up to a limit and close it, allowing the
|
||
|
// transport to reuse TCP connections.
|
||
|
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
|
||
|
response.Body.Close()
|
||
|
}
|
||
|
|
||
|
// Flush is a no-op for HTTPSyncTransport. It always returns true immediately.
|
||
|
func (t *HTTPSyncTransport) Flush(_ time.Duration) bool {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (t *HTTPSyncTransport) disabled(c ratelimit.Category) bool {
|
||
|
t.mu.Lock()
|
||
|
defer t.mu.Unlock()
|
||
|
disabled := t.limits.IsRateLimited(c)
|
||
|
if disabled {
|
||
|
Logger.Printf("Too many requests for %q, backing off till: %v", c, t.limits.Deadline(c))
|
||
|
}
|
||
|
return disabled
|
||
|
}
|
||
|
|
||
|
// ================================
|
||
|
// noopTransport
|
||
|
// ================================
|
||
|
|
||
|
// noopTransport is an implementation of Transport interface which drops all the events.
|
||
|
// Only used internally when an empty DSN is provided, which effectively disables the SDK.
|
||
|
type noopTransport struct{}
|
||
|
|
||
|
var _ Transport = noopTransport{}
|
||
|
|
||
|
func (noopTransport) Configure(ClientOptions) {
|
||
|
Logger.Println("Sentry client initialized with an empty DSN. Using noopTransport. No events will be delivered.")
|
||
|
}
|
||
|
|
||
|
func (noopTransport) SendEvent(*Event) {
|
||
|
Logger.Println("Event dropped due to noopTransport usage.")
|
||
|
}
|
||
|
|
||
|
func (noopTransport) Flush(time.Duration) bool {
|
||
|
return true
|
||
|
}
|