Class: Datadog::Workers::AsyncTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/workers.rb

Overview

Asynchronous worker that executes a +Send()+ operation after given seconds. Under the hood, it uses +Concurrent::TimerTask+ so that the thread will perform a task at regular intervals. The thread can be stopped with the +stop()+ method and can start with the +start()+ method.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
DEFAULT_FLUSH_INTERVAL =
1
DEFAULT_TIMEOUT =
5
BACK_OFF_RATIO =
1.2
BACK_OFF_MAX =
5
SHUTDOWN_TIMEOUT =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AsyncTransport

Returns a new instance of AsyncTransport.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/ddtrace/workers.rb', line 27

def initialize(options = {})
  @transport = options[:transport]

  # Callbacks
  @trace_task = options[:on_trace]

  # Intervals
  interval = options.fetch(:interval, DEFAULT_FLUSH_INTERVAL)
  @flush_interval = interval
  @back_off = interval

  # Buffers
  buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  @trace_buffer = TraceBuffer.new(buffer_size)

  # Threading
  @shutdown = ConditionVariable.new
  @mutex = Mutex.new
  @worker = nil
  @run = false
end

Instance Attribute Details

#trace_bufferObject (readonly)

Returns the value of attribute trace_buffer.



24
25
26
# File 'lib/ddtrace/workers.rb', line 24

def trace_buffer
  @trace_buffer
end

Instance Method Details

#callback_tracesObject Also known as: flush_data

Callback function that process traces and executes the +send_traces()+ method.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/ddtrace/workers.rb', line 50

def callback_traces
  return true if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop
    traces = Pipeline.process!(traces)
    @trace_task.call(traces, @transport) unless @trace_task.nil?
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog.logger.error(
      "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{Array(e.backtrace).first}"
    )
  end
end

#enqueue_trace(trace) ⇒ Object

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the +TraceBuffer+ data structure.



102
103
104
# File 'lib/ddtrace/workers.rb', line 102

def enqueue_trace(trace)
  @trace_buffer.push(trace)
end

#joinObject

Block until executor shutdown is complete or until timeout seconds have passed.



96
97
98
# File 'lib/ddtrace/workers.rb', line 96

def join
  @worker.join(SHUTDOWN_TIMEOUT)
end

#startObject

Start the timer execution.



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ddtrace/workers.rb', line 68

def start
  @mutex.synchronize do
    return if @run

    @run = true
    Datadog.logger.debug { "Starting thread for: #{self}" }
    @worker = Thread.new { perform }
    @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')

    nil
  end
end

#stopObject

Closes all available queues and waits for the trace buffer to flush



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ddtrace/workers.rb', line 82

def stop
  @mutex.synchronize do
    return unless @run

    @trace_buffer.close
    @run = false
    @shutdown.signal
  end

  join
  true
end