Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["datadog", "dogstatsd", "client"]

[dependencies]
chrono = { version = "0.4", default-features = false, features = ["clock"] }
rand = { version = "0.8.5", default-features = false, features = ["std", "std_rng"] }

[features]
unstable = []
75 changes: 69 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub type DogstatsdResult = Result<(), DogstatsdError>;

const DEFAULT_FROM_ADDR: &str = "0.0.0.0:0";
const DEFAULT_TO_ADDR: &str = "127.0.0.1:8125";
const DEFAULT_SAMPLING_RATE: f32 = 1.0;

/// The struct that represents the options available for the Dogstatsd client.
#[derive(Debug, PartialEq)]
Expand All @@ -111,6 +112,8 @@ pub struct Options {
pub namespace: String,
/// Default tags to include with every request.
pub default_tags: Vec<String>,
/// The sample rate to use for all metrics.
pub sampling_rate: f32,
}

impl Default for Options {
Expand All @@ -128,7 +131,8 @@ impl Default for Options {
/// from_addr: "0.0.0.0:0".into(),
/// to_addr: "127.0.0.1:8125".into(),
/// namespace: String::new(),
/// default_tags: vec!()
/// default_tags: vec!(),
/// sampling_rate: 1.0,
/// },
/// options
/// )
Expand All @@ -139,12 +143,13 @@ impl Default for Options {
to_addr: DEFAULT_TO_ADDR.into(),
namespace: String::new(),
default_tags: vec![],
sampling_rate: 1.0,
}
}
}

impl Options {
/// Create a new options struct by supplying values for all fields.
/// Create a new options struct by supplying values for all fields but the sampling rate.
///
/// # Examples
///
Expand All @@ -159,6 +164,32 @@ impl Options {
to_addr: to_addr.into(),
namespace: namespace.into(),
default_tags,
sampling_rate: 1.0,
}
}

/// Create a new options struct by supplying values for all fields.
///
/// # Examples
///
/// /// ```
/// use dogstatsd::Options;
///
/// let options = Options::new("127.0.0.1:9000", "127.0.0.1:9001", "", vec!(String::new()), 0.5);
/// ```
pub fn with_sampling_rate(
from_addr: &str,
to_addr: &str,
namespace: &str,
default_tags: Vec<String>,
sampling_rate: f32,
) -> Self {
Options {
from_addr: from_addr.into(),
to_addr: to_addr.into(),
namespace: namespace.into(),
default_tags,
sampling_rate,
}
}
}
Expand All @@ -174,6 +205,8 @@ pub struct OptionsBuilder {
namespace: Option<String>,
/// Default tags to include with every request.
default_tags: Vec<String>,
/// The sample rate to use for all metrics.
sampling_rate: Option<f32>,
}

impl OptionsBuilder {
Expand Down Expand Up @@ -246,6 +279,19 @@ impl OptionsBuilder {
self
}

/// Will allow the builder to generate an `Options` struct with the provided value.
///
/// # Examples
/// ```
/// use dogstatsd::OptionsBuilder;
///
/// let options_builder = OptionsBuilder::new().sampling_rate(0.5);
/// ```
pub fn sampling_rate(&mut self, sampling_rate: f32) -> &mut OptionsBuilder {
self.sampling_rate = Some(sampling_rate);
self
}

/// Will construct an `Options` with all of the provided values and fall back to the default values if they aren't provided.
///
/// # Examples
Expand All @@ -254,20 +300,21 @@ impl OptionsBuilder {
/// use dogstatsd::OptionsBuilder;
/// use dogstatsd::Options;
///
/// let options = OptionsBuilder::new().namespace(String::from("mynamespace")).default_tag(String::from("tag1:tav1val")).build();
/// let options = OptionsBuilder::new().namespace(String::from("mynamespace")).default_tag(String::from("tag1:tav1val")).sampling_rate(0.75).build();
///
/// assert_eq!(
/// Options {
/// from_addr: "0.0.0.0:0".into(),
/// to_addr: "127.0.0.1:8125".into(),
/// namespace: String::from("mynamespace"),
/// default_tags: vec!(String::from("tag1:tav1val"))
/// default_tags: vec!(String::from("tag1:tav1val")),
/// sampling_rate: 0.75,
/// },
/// options
/// )
/// ```
pub fn build(&self) -> Options {
Options::new(
Options::with_sampling_rate(
self.from_addr
.as_ref()
.unwrap_or(&String::from(DEFAULT_FROM_ADDR)),
Expand All @@ -276,6 +323,7 @@ impl OptionsBuilder {
.unwrap_or(&String::from(DEFAULT_TO_ADDR)),
self.namespace.as_ref().unwrap_or(&String::default()),
self.default_tags.to_vec(),
self.sampling_rate.unwrap_or(DEFAULT_SAMPLING_RATE),
)
}
}
Expand All @@ -288,6 +336,7 @@ pub struct Client {
to_addr: String,
namespace: String,
default_tags: Vec<u8>,
sampling_rate: f32,
}

impl PartialEq for Client {
Expand Down Expand Up @@ -317,6 +366,7 @@ impl Client {
to_addr: options.to_addr,
namespace: options.namespace,
default_tags: options.default_tags.join(",").into_bytes(),
sampling_rate: options.sampling_rate,
})
}

Expand Down Expand Up @@ -651,7 +701,16 @@ impl Client {
M: Metric,
S: AsRef<str>,
{
let formatted_metric = format_for_send(metric, &self.namespace, tags, &self.default_tags);
if self.sampling_rate < 1.0 && rand::random::<f32>() > self.sampling_rate {
return Ok(());
}
let formatted_metric = format_for_send(
metric,
&self.namespace,
tags,
self.sampling_rate,
&self.default_tags,
);
self.socket
.send_to(formatted_metric.as_slice(), &self.to_addr)?;
Ok(())
Expand Down Expand Up @@ -697,12 +756,14 @@ mod tests {
.to_addr("127.0.0.2:8125".into())
.namespace("mynamespace".into())
.default_tag(String::from("tag1:tag1val"))
.sampling_rate(0.75)
.build();
let expected_options = Options {
from_addr: "127.0.0.2:0".into(),
to_addr: "127.0.0.2:8125".into(),
namespace: "mynamespace".into(),
default_tags: vec!["tag1:tag1val".into()].to_vec(),
sampling_rate: 0.75,
};

assert_eq!(expected_options, options);
Expand All @@ -717,6 +778,7 @@ mod tests {
to_addr: DEFAULT_TO_ADDR.into(),
namespace: String::new(),
default_tags: String::new().into_bytes(),
sampling_rate: 1.0,
};

assert_eq!(expected_client, client)
Expand All @@ -737,6 +799,7 @@ mod tests {
to_addr: DEFAULT_TO_ADDR.into(),
namespace: String::new(),
default_tags: String::from("tag1:tag1val").into_bytes(),
sampling_rate: 1.0,
};

assert_eq!(expected_client, client)
Expand Down
29 changes: 28 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub fn format_for_send<M, I, S>(
in_metric: &M,
in_namespace: &str,
tags: I,
sampling: f32,
default_tags: &Vec<u8>,
) -> Vec<u8>
where
Expand All @@ -26,6 +27,11 @@ where

buf.extend_from_slice(metric.as_bytes());

if sampling < 1.0 {
buf.extend_from_slice(b"|@");
buf.extend_from_slice(sampling.to_string().as_bytes());
}

let mut tags_iter = tags.into_iter();
let mut next_tag = tags_iter.next();
let has_tags = next_tag.is_some();
Expand Down Expand Up @@ -379,6 +385,7 @@ mod tests {
&CountMetric::Incr("foo"),
"namespace",
&[] as &[String],
1.0,
&String::default().into_bytes()
)[..]
)
Expand All @@ -392,6 +399,7 @@ mod tests {
&CountMetric::Incr("foo"),
"",
&["tag:1", "tag:2"],
1.0,
&String::default().into_bytes()
)[..]
)
Expand All @@ -405,6 +413,7 @@ mod tests {
&CountMetric::Incr("foo"),
"namespace",
&["tag:1", "tag:2"],
1.0,
&String::from("defaultag:3,seconddefault:4").into_bytes()
)[..]
)
Expand All @@ -418,6 +427,21 @@ mod tests {
&CountMetric::Incr("foo"),
"namespace",
&["tag:1", "tag:2"],
1.0,
&String::from("defaultag:3,seconddefault:4").into_bytes()
)[..]
)
}

#[test]
fn test_format_for_send_sampling() {
assert_eq!(
&b"namespace.foo:1|c|@0.33|#tag:1,tag:2,defaultag:3,seconddefault:4"[..],
&format_for_send(
&CountMetric::Incr("foo"),
"namespace",
&["tag:1", "tag:2"],
0.33,
&String::from("defaultag:3,seconddefault:4").into_bytes()
)[..]
)
Expand All @@ -431,6 +455,7 @@ mod tests {
&Event::new("title".into(), "text".into()),
"namespace",
&["tag:1", "tag:2"],
1.0,
&String::default().into_bytes()
)[..]
)
Expand All @@ -444,6 +469,7 @@ mod tests {
&CountMetric::Incr("foo"),
"namespace",
&[] as &[String],
1.0,
&String::from("defaultag:3,seconddefault:4").into_bytes()
)[..]
)
Expand Down Expand Up @@ -619,9 +645,10 @@ mod bench {
#[bench]
fn bench_format_for_send(b: &mut Bencher) {
let metric = NullMetric;
let tags = String::default().into_bytes();

b.iter(|| {
format_for_send(&metric, "foo", &["bar", "baz"]);
format_for_send(&metric, "foo", &["bar", "baz"], 1.0, &tags);
})
}

Expand Down