-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenTracing #249
base: master
Are you sure you want to change the base?
OpenTracing #249
Changes from all commits
a77a269
6cc8b81
b018434
11dba5b
283542d
461204d
b99e5fa
d9efa11
08dadbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
defmodule Annon.Monitoring.Latencies do | ||
@moduledoc false | ||
|
||
defstruct client_request: nil, | ||
upstream: nil, | ||
gateway: nil | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
defmodule Annon.Monitoring.MetricsCollector do | ||
@moduledoc """ | ||
This module provides helper functions to persist meaningful metrics to StatsD or DogstatsD servers. | ||
|
||
Code is based on [Statix](https://github.com/lexmag/statix) library. | ||
""" | ||
import DogStat | ||
alias Annon.Monitoring.Latencies | ||
|
||
def track_request(_request_id, nil, opts), | ||
do: increment("request_count", 1, opts) | ||
def track_request(_request_id, content_length, opts) do | ||
increment("request_count", 1, opts) | ||
histogram("request_size", content_length, opts) | ||
end | ||
|
||
def track_response(_request_id, latencies, opts) do | ||
%Latencies{client_request: client, upstream: upstream, gateway: gateway} = latencies | ||
|
||
histogram("latencies_client", client, opts) | ||
histogram("latencies_upstream", upstream, opts) | ||
histogram("latencies_gateway", gateway, opts) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
defmodule Annon.Monitoring.Trace do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modules should have a @moduledoc tag. |
||
alias Annon.Monitoring.Trace | ||
alias Annon.Monitoring.Trace.BinaryAnnotation | ||
alias Annon.Monitoring.Trace.Endpoint | ||
alias Plug.Conn | ||
|
||
defstruct traceId: nil, # Randomly generated, unique for a trace, set on all spans within it. 16-32 chars | ||
name: nil, # Span name in lowercase (e.g. rpc method) | ||
parentId: nil, # Parent span id. 8-byte identifier encoded as 16 lowercase hex characters. | ||
# Can be omitted or set to nil if span is the root span of a trace. | ||
id: nil, # Id of current span, unique in context of traceId. | ||
# 8-byte identifier encoded as 16 lowercase hex characters. | ||
timestamp: nil, # Epoch **microseconds** of the start of this span, | ||
# possibly absent if this an incomplete span. | ||
duration: nil, # Duration in **microseconds** of the critical path, if known. | ||
# Durations of less than one are rounded up. | ||
debug: false, | ||
annotations: [], | ||
binaryAnnotations: [] | ||
|
||
def start_span(%Conn{} = conn, opts \\ []) do | ||
request_id = get_request_id(conn, Ecto.UUID.generate()) | ||
timestamp = System.monotonic_time() |> System.convert_time_unit(:native, :microseconds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a function call when a pipeline is only one function long |
||
endpoint = nil | ||
|
||
annotations = | ||
opts | ||
|> Keyword.get(:annotations, []) | ||
|> Enum.map(fn {key, value} -> %BinaryAnnotation{key: key, value: value, endpoint: endpoint} end) | ||
|
||
%Trace{ | ||
traceId: request_id, | ||
name: "gateway request", | ||
id: Ecto.UUID.generate(), | ||
timestamp: timestamp, | ||
binaryAnnotations: annotations | ||
} | ||
end | ||
|
||
def end_span(%Trace{} = trace, opts \\ []) do | ||
duration = System.convert_time_unit(System.monotonic_time(), :native, :microseconds) - trace.timestamp | ||
endpoint = nil | ||
|
||
annotations = | ||
opts | ||
|> Keyword.get(:annotations, []) | ||
|> Enum.reduce(trace.annotations, fn {key, value}, annotations -> | ||
[%BinaryAnnotation{key: key, value: value, endpoint: endpoint}] ++ annotations | ||
end) | ||
|
||
%{trace | | ||
duration: duration, | ||
annotations: annotations | ||
} | ||
end | ||
|
||
defp get_request_id(conn, default) do | ||
case Conn.get_resp_header(conn, "x-request-id") do | ||
[] -> default | ||
[id | _] -> id | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
defmodule Annon.Monitoring.Trace.Annotation do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modules should have a @moduledoc tag. |
||
defstruct timestamp: nil, | ||
value: nil, | ||
endpoint: %Annon.Monitoring.Trace.Endpoint{} | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
defmodule Annon.Monitoring.Trace.BinaryAnnotation do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modules should have a @moduledoc tag. |
||
defstruct key: nil, | ||
value: nil, | ||
endpoint: %Annon.Monitoring.Trace.Endpoint{} | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
defmodule Annon.Monitoring.Trace.Endpoint do | ||
defstruct serviceName: nil, # Classifier of this endpoint in lowercase, such as "acme-front-end" | ||
ipv4: nil, # The text representation of a IPv4 address associated with this endpoint. Ex. 192.168.99.100 | ||
ipv6: nil, # The text representation of a IPv6 address associated with this endpoint. Ex. 2001:db8::c001 | ||
port: nil | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
defmodule Annon.Monitoring.TraceCollector do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modules should have a @moduledoc tag. |
||
alias Annon.Monitoring.Trace | ||
|
||
def send_span(conn) do | ||
span = | ||
conn | ||
|> Trace.start_span() | ||
|> Trace.end_span() | ||
|
||
spans = Poison.encode!([span]) | ||
|
||
IO.inspect HTTPoison.post!("http://localhost:9411/api/v1/spans", spans, [ | ||
{"content-type", "application/json"}, | ||
{"accept", "application/json"}, | ||
]) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ defmodule Annon.Plugins.Monitoring do | |
""" | ||
use Annon.Plugin, plugin_name: :monitoring | ||
alias Plug.Conn | ||
alias Annon.Monitoring.MetricsCollector | ||
alias Annon.Monitoring.Latencies | ||
|
||
def validate_settings(changeset), | ||
do: changeset | ||
|
@@ -16,75 +18,86 @@ defmodule Annon.Plugins.Monitoring do | |
do: %{} | ||
|
||
def execute(%Conn{} = conn, %{api: api, start_time: request_start_time}, _settings) do | ||
api_tags = tags(conn, api) | ||
content_length = get_content_length(conn, nil) | ||
|
||
conn | ||
|> get_request_size() | ||
|> ExStatsD.histogram("request_size", tags: api_tags) | ||
sample_rate = | ||
:annon_api | ||
|> Application.get_env(:metrics_collector) | ||
|> Keyword.get(:sample_rate, 1) | ||
|> Confex.process_env() | ||
|
||
collector_opts = [ | ||
tags: tags(conn, api), | ||
sample_rate: sample_rate | ||
] | ||
|
||
ExStatsD.increment("request_count", tags: api_tags) | ||
request_id = get_request_id(conn, nil) | ||
|
||
MetricsCollector.track_request(request_id, content_length, collector_opts) | ||
|
||
conn | ||
|> Conn.register_before_send(&write_metrics(&1, api)) | ||
|> Conn.register_before_send(&assign_latencies(&1, request_start_time)) | ||
|> Conn.register_before_send(&track_latencies(&1, request_id, request_start_time, collector_opts)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a function call when a pipeline is only one function long |
||
end | ||
|
||
defp assign_latencies(conn, request_start_time) do | ||
defp track_latencies(conn, request_id, request_start_time, collector_opts) do | ||
request_end_time = System.monotonic_time() | ||
latencies_client = System.convert_time_unit(request_end_time - request_start_time, :native, :micro_seconds) | ||
request_duration = latencies_client - Map.get(conn.assigns, :latencies_upstream, 0) | ||
|
||
conn | ||
|> Conn.assign(:latencies_gateway, request_duration) | ||
|> Conn.assign(:latencies_client, latencies_client) | ||
end | ||
latencies_upstream = Map.get(conn.assigns, :latencies_upstream, 0) | ||
latencies_gateway = latencies_client - latencies_upstream | ||
|
||
defp write_metrics(%Conn{} = conn, api) do | ||
api_tags = tags(conn, api) ++ ["http_status:#{to_string conn.status}"] | ||
ExStatsD.timer(conn.assigns.latencies_client, "latency", tags: api_tags) | ||
ExStatsD.increment("response_count", tags: api_tags) | ||
conn | ||
end | ||
latencies = %Latencies{ | ||
client_request: latencies_client, | ||
upstream: latencies_upstream, | ||
gateway: latencies_gateway | ||
} | ||
|
||
defp tags(%Conn{host: host, method: method, port: port} = conn, api), | ||
do: ["http_host:#{to_string host}", | ||
"http_method:#{to_string method}", | ||
"http_port:#{to_string port}"] ++ api_tags(api) ++ get_request_id(conn) | ||
status = conn |> get_conn_status(0) |> Integer.to_string() | ||
|
||
defp api_tags(%{name: api_name, id: api_id}), | ||
do: ["api_name:#{to_string api_name}", "api_id:#{to_string api_id}"] | ||
defp api_tags(_), | ||
do: ["api_name:unknown", "api_id:unknown"] | ||
MetricsCollector.track_response(request_id, latencies, [ | ||
tags: ["http.status:#{status}"] ++ collector_opts[:tags], | ||
sample_rate: collector_opts[:sample_rate] | ||
]) | ||
|
||
defp get_request_id(conn) do | ||
id = conn | ||
|> Conn.get_resp_header("x-request-id") | ||
|> Enum.at(0) | ||
Annon.Monitoring.TraceCollector.send_span(conn) | ||
|
||
["request_id:#{to_string id}"] | ||
conn | ||
|> Conn.assign(:latencies_gateway, latencies_gateway) | ||
|> Conn.assign(:latencies_client, latencies_client) | ||
|> Conn.assign(:latencies, latencies) | ||
end | ||
|
||
defp get_request_size(conn) do | ||
get_headers_size(conn) + get_body_size(conn) + get_query_string_size(conn) | ||
defp tags(%Conn{host: host, method: method, port: port} = conn, nil) do | ||
port = Integer.to_string(port) | ||
request_id = get_request_id(conn, "unknown") | ||
|
||
["http.host:#{host}", "http.method:#{method}", "http.port:#{port}", | ||
"api.name:unknown", "api.id:unknown", "request.id:#{request_id}"] | ||
end | ||
defp tags(%Conn{host: host, method: method, port: port} = conn, api) do | ||
port = Integer.to_string(port) | ||
request_id = get_request_id(conn, "unknown") | ||
%{id: api_id, name: api_name} = api | ||
|
||
defp get_headers_size(%Conn{req_headers: req_headers}) do | ||
req_headers | ||
|> Enum.map(&Tuple.to_list(&1)) | ||
|> List.flatten | ||
|> Enum.join | ||
|> byte_size | ||
["http.host:#{host}", "http.method:#{method}", "http.port:#{port}", | ||
"api.name:#{api_name}", "api.id:#{api_id}", "request.id:#{request_id}"] | ||
end | ||
|
||
defp get_body_size(conn) do | ||
conn | ||
|> Conn.read_body | ||
|> elem(1) | ||
|> byte_size | ||
defp get_request_id(conn, default) do | ||
case Conn.get_resp_header(conn, "x-request-id") do | ||
[] -> default | ||
[id | _] -> id | ||
end | ||
end | ||
|
||
defp get_query_string_size(%Conn{query_string: query_string}) do | ||
query_string | ||
|> byte_size | ||
defp get_content_length(conn, default) do | ||
case Conn.get_resp_header(conn, "content-length") do | ||
[] -> default | ||
[id | _] -> id | ||
end | ||
end | ||
|
||
def get_conn_status(%{status: nil}, default), | ||
do: default | ||
def get_conn_status(%{status: status}, _default), | ||
do: status | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Large numbers should be written with underscores: 32_768