Class: Datadog::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/writer.rb

Overview

Processor that sends traces and metadata to the agent

Constant Summary collapse

DEPRECATION_WARN_ONLY_ONCE =
Datadog::Utils::OnlyOnce.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/ddtrace/writer.rb', line 25

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)

  # priority sampling
  if options[:priority_sampler]
    @priority_sampler = options[:priority_sampler]
    transport_options[:api_version] ||= Transport::HTTP::API::V4
  end

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(**transport_options)
  end

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil

  # Once stopped, this writer instance cannot be restarted.
  # This allow for graceful shutdown, while preventing
  # the host application from inadvertently start new
  # threads during shutdown.
  @stopped = false
end

Instance Attribute Details

#priority_samplerObject (readonly)

Returns the value of attribute priority_sampler.



20
21
22
# File 'lib/ddtrace/writer.rb', line 20

def priority_sampler
  @priority_sampler
end

#transportObject (readonly)

Returns the value of attribute transport.



20
21
22
# File 'lib/ddtrace/writer.rb', line 20

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



20
21
22
# File 'lib/ddtrace/writer.rb', line 20

def worker
  @worker
end

Instance Method Details

#send_spans(traces, transport) ⇒ Object

flush spans to the trace-agent, handles spans only



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ddtrace/writer.rb', line 111

def send_spans(traces, transport)
  return true if traces.empty?

  # Inject hostname if configured to do so
  inject_hostname!(traces) if Datadog.configuration.report_hostname

  # Send traces and get responses
  responses = transport.send_traces(traces)

  # Tally up successful flushes
  responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
    @traces_flushed += response.trace_count
  end

  # Update priority sampler
  update_priority_sampler(responses.last)

  record_environment_information!(responses)

  # Return if server error occurred.
  !responses.find(&:server_error?)
end

#startObject



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/ddtrace/writer.rb', line 60

def start
  @mutex_after_fork.synchronize do
    return false if @stopped

    pid = Process.pid
    return if @worker && pid == @pid

    @pid = pid

    start_worker
    true
  end
end

#statsObject

stats returns a dictionary of stats about the writer.



172
173
174
175
176
177
# File 'lib/ddtrace/writer.rb', line 172

def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end

#stopObject

Gracefully shuts down this writer.

Once stopped methods calls won't fail, but no internal work will be performed.

It is not possible to restart a stopped writer instance.



93
94
95
# File 'lib/ddtrace/writer.rb', line 93

def stop
  @mutex_after_fork.synchronize { stop_worker }
end

#write(trace, services = nil) ⇒ Object

enqueue the trace for submission to the API



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/ddtrace/writer.rb', line 135

def write(trace, services = nil)
  unless services.nil?
    DEPRECATION_WARN_ONLY_ONCE.run do
      Datadog.logger.warn(%(
        write: Writing services has been deprecated and no longer need to be provided.
        write(traces, services) can be updated to write(traces)
      ))
    end
  end

  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  start if @worker.nil? || @pid != Process.pid

  # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
  #       instead of working through the trace writer.
  # Associate root span with runtime metrics
  if Datadog.configuration.runtime_metrics.enabled && !trace.empty?
    Datadog.runtime_metrics.associate_with_span(trace.first)
  end

  worker_local = @worker

  if worker_local
    worker_local.enqueue_trace(trace)
  elsif !@stopped
    Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
  end
end