use std::collections::HashMap;
use std::hash::BuildHasher;
use std::str::{self, FromStr};
use std::time::Duration;
use reqwest::blocking::Client;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, StatusCode, Url};
use crate::encoder::{Encoder, ProtobufEncoder};
use crate::errors::{Error, Result};
use crate::metrics::Collector;
use crate::proto;
use crate::registry::Registry;
const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10);
lazy_static! {
static ref HTTP_CLIENT: Client = Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}
#[derive(Debug)]
pub struct BasicAuthentication {
pub username: String,
pub password: String,
}
pub fn push_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push(job, grouping, url, mfs, "PUT", basic_auth)
}
pub fn push_add_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push(job, grouping, url, mfs, "POST", basic_auth)
}
const LABEL_NAME_JOB: &str = "job";
fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let grouping = grouping;
let mut push_url = if url.contains("://") {
url.to_owned()
} else {
format!("http://{}", url)
};
if push_url.ends_with('/') {
push_url.pop();
}
let mut url_components = Vec::new();
if job.contains('/') {
return Err(Error::Msg(format!("job contains '/': {}", job)));
}
url_components.push(job.to_owned());
for (ln, lv) in &grouping {
if lv.contains('/') {
return Err(Error::Msg(format!(
"value of grouping label {} contains '/': {}",
ln, lv
)));
}
url_components.push(ln.to_owned());
url_components.push(lv.to_owned());
}
push_url = format!("{}/metrics/job/{}", push_url, url_components.join("/"));
let encoder = ProtobufEncoder::new();
let mut buf = Vec::new();
for mf in mfs {
for m in mf.get_metric() {
for lp in m.get_label() {
if lp.get_name() == LABEL_NAME_JOB {
return Err(Error::Msg(format!(
"pushed metric {} already contains a \
job label",
mf.get_name()
)));
}
if grouping.contains_key(lp.get_name()) {
return Err(Error::Msg(format!(
"pushed metric {} already contains \
grouping label {}",
mf.get_name(),
lp.get_name()
)));
}
}
}
let _ = encoder.encode(&[mf], &mut buf);
}
let mut builder = HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Url::from_str(&push_url).unwrap(),
)
.header(CONTENT_TYPE, encoder.format_type())
.body(buf);
if let Some(BasicAuthentication { username, password }) = basic_auth {
builder = builder.basic_auth(username, Some(password));
}
let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
match response.status() {
StatusCode::ACCEPTED => Ok(()),
StatusCode::OK => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status(),
push_url
))),
}
}
fn push_from_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<dyn Collector>>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let registry = Registry::new();
for bc in collectors {
registry.register(bc)?;
}
let mfs = registry.gather();
push(job, grouping, url, mfs, method, basic_auth)
}
pub fn push_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
}
pub fn push_add_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
}
const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
#[cfg(not(target_os = "windows"))]
pub fn hostname_grouping_key() -> HashMap<String, String> {
let max_len = 256;
let mut name = vec![0u8; max_len];
match unsafe {
libc::gethostname(
name.as_mut_ptr() as *mut libc::c_char,
max_len as libc::size_t,
)
} {
0 => {
let last_char = name.iter().position(|byte| *byte == 0).unwrap_or(max_len);
labels! {
DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => str::from_utf8(&name[..last_char])
.unwrap_or(DEFAULT_GROUP_LABEL_PAIR.1).to_owned(),
}
}
_ => {
labels! {DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
}
}
}
#[cfg(target_os = "windows")]
pub fn hostname_grouping_key() -> HashMap<String, String> {
labels! {DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proto;
#[test]
fn test_hostname_grouping_key() {
let map = hostname_grouping_key();
assert!(!map.is_empty());
}
#[test]
fn test_push_bad_label_name() {
let table = vec![
(LABEL_NAME_JOB, "job label"),
(DEFAULT_GROUP_LABEL_PAIR.0, "grouping label"),
];
for case in table {
let mut l = proto::LabelPair::new();
l.set_name(case.0.to_owned());
let mut m = proto::Metric::new();
m.set_label(from_vec!(vec![l]));
let mut mf = proto::MetricFamily::new();
mf.set_metric(from_vec!(vec![m]));
let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
assert!(format!("{}", res.unwrap_err()).contains(case.1));
}
}
}