1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Copyright 2014 The Prometheus Authors
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::HashMap;
use std::hash::BuildHasher;
use std::str::{self, FromStr};
use std::time::Duration;

use reqwest::blocking::Client;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, StatusCode, Url};

use crate::encoder::{Encoder, ProtobufEncoder};
use crate::errors::{Error, Result};
use crate::metrics::Collector;
use crate::proto;
use crate::registry::Registry;

const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10);

lazy_static! {
    static ref HTTP_CLIENT: Client = Client::builder()
        .timeout(REQWEST_TIMEOUT_SEC)
        .build()
        .unwrap();
}

/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
/// using Basic access authentication.
/// Can be passed to any `push_metrics` method.
#[derive(Debug)]
pub struct BasicAuthentication {
    /// The Basic Authentication username (possibly empty string).
    pub username: String,
    /// The Basic Authentication password (possibly empty string).
    pub password: String,
}

/// `push_metrics` pushes all gathered metrics to the Pushgateway specified by
/// url, using the provided job name and the (optional) further grouping labels
/// (the grouping map may be nil). See the Pushgateway documentation for
/// detailed implications of the job and other grouping labels. Neither the job
/// name nor any grouping label value may contain a "/". The metrics pushed must
/// not contain a job label of their own nor any of the grouping labels.
///
/// You can use just host:port or ip:port as url, in which case 'http://' is
/// added automatically. You can also include the schema in the URL. However, do
/// not include the '/metrics/jobs/...' part.
///
/// Note that all previously pushed metrics with the same job and other grouping
/// labels will be replaced with the metrics pushed by this call. (It uses HTTP
/// method 'PUT' to push to the Pushgateway.)
pub fn push_metrics<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    mfs: Vec<proto::MetricFamily>,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    push(job, grouping, url, mfs, "PUT", basic_auth)
}

/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
pub fn push_add_metrics<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    mfs: Vec<proto::MetricFamily>,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    push(job, grouping, url, mfs, "POST", basic_auth)
}

const LABEL_NAME_JOB: &str = "job";

fn push<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    mfs: Vec<proto::MetricFamily>,
    method: &str,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    // Suppress clippy warning needless_pass_by_value.
    let grouping = grouping;

    let mut push_url = if url.contains("://") {
        url.to_owned()
    } else {
        format!("http://{}", url)
    };

    if push_url.ends_with('/') {
        push_url.pop();
    }

    let mut url_components = Vec::new();
    if job.contains('/') {
        return Err(Error::Msg(format!("job contains '/': {}", job)));
    }

    // TODO: escape job
    url_components.push(job.to_owned());

    for (ln, lv) in &grouping {
        // TODO: check label name
        if lv.contains('/') {
            return Err(Error::Msg(format!(
                "value of grouping label {} contains '/': {}",
                ln, lv
            )));
        }
        url_components.push(ln.to_owned());
        url_components.push(lv.to_owned());
    }

    push_url = format!("{}/metrics/job/{}", push_url, url_components.join("/"));

    let encoder = ProtobufEncoder::new();
    let mut buf = Vec::new();

    for mf in mfs {
        // Check for pre-existing grouping labels:
        for m in mf.get_metric() {
            for lp in m.get_label() {
                if lp.get_name() == LABEL_NAME_JOB {
                    return Err(Error::Msg(format!(
                        "pushed metric {} already contains a \
                         job label",
                        mf.get_name()
                    )));
                }
                if grouping.contains_key(lp.get_name()) {
                    return Err(Error::Msg(format!(
                        "pushed metric {} already contains \
                         grouping label {}",
                        mf.get_name(),
                        lp.get_name()
                    )));
                }
            }
        }
        // Ignore error, `no metrics` and `no name`.
        let _ = encoder.encode(&[mf], &mut buf);
    }

    let mut builder = HTTP_CLIENT
        .request(
            Method::from_str(method).unwrap(),
            Url::from_str(&push_url).unwrap(),
        )
        .header(CONTENT_TYPE, encoder.format_type())
        .body(buf);

    if let Some(BasicAuthentication { username, password }) = basic_auth {
        builder = builder.basic_auth(username, Some(password));
    }

    let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;

    match response.status() {
        StatusCode::ACCEPTED => Ok(()),
        StatusCode::OK => Ok(()),
        _ => Err(Error::Msg(format!(
            "unexpected status code {} while pushing to {}",
            response.status(),
            push_url
        ))),
    }
}

fn push_from_collector<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    collectors: Vec<Box<dyn Collector>>,
    method: &str,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    let registry = Registry::new();
    for bc in collectors {
        registry.register(bc)?;
    }

    let mfs = registry.gather();
    push(job, grouping, url, mfs, method, basic_auth)
}

/// `push_collector` push metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
pub fn push_collector<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    collectors: Vec<Box<dyn Collector>>,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
}

/// `push_add_collector` works like `push_add_metrics`, it collects from the
/// provided collectors. It is a convenient way to push only a few metrics.
pub fn push_add_collector<S: BuildHasher>(
    job: &str,
    grouping: HashMap<String, String, S>,
    url: &str,
    collectors: Vec<Box<dyn Collector>>,
    basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
    push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
}

const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");

/// `hostname_grouping_key` returns a label map with the only entry
/// {instance="<hostname>"}. This can be conveniently used as the grouping
/// parameter if metrics should be pushed with the hostname as label. The
/// returned map is created upon each call so that the caller is free to add more
/// labels to the map.
///
/// Note: This function returns `instance = "unknown"` in Windows.
#[cfg(not(target_os = "windows"))]
pub fn hostname_grouping_key() -> HashMap<String, String> {
    // Host names are limited to 255 bytes.
    //   ref: http://pubs.opengroup.org/onlinepubs/7908799/xns/gethostname.html
    let max_len = 256;
    let mut name = vec![0u8; max_len];
    match unsafe {
        libc::gethostname(
            name.as_mut_ptr() as *mut libc::c_char,
            max_len as libc::size_t,
        )
    } {
        0 => {
            let last_char = name.iter().position(|byte| *byte == 0).unwrap_or(max_len);
            labels! {
                DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => str::from_utf8(&name[..last_char])
                                            .unwrap_or(DEFAULT_GROUP_LABEL_PAIR.1).to_owned(),
            }
        }
        _ => {
            labels! {DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
        }
    }
}

#[cfg(target_os = "windows")]
pub fn hostname_grouping_key() -> HashMap<String, String> {
    labels! {DEFAULT_GROUP_LABEL_PAIR.0.to_owned() => DEFAULT_GROUP_LABEL_PAIR.1.to_owned(),}
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::proto;

    #[test]
    fn test_hostname_grouping_key() {
        let map = hostname_grouping_key();
        assert!(!map.is_empty());
    }

    #[test]
    fn test_push_bad_label_name() {
        let table = vec![
            // Error message: "pushed metric {} already contains a job label"
            (LABEL_NAME_JOB, "job label"),
            // Error message: "pushed metric {} already contains grouping label {}"
            (DEFAULT_GROUP_LABEL_PAIR.0, "grouping label"),
        ];

        for case in table {
            let mut l = proto::LabelPair::new();
            l.set_name(case.0.to_owned());
            let mut m = proto::Metric::new();
            m.set_label(from_vec!(vec![l]));
            let mut mf = proto::MetricFamily::new();
            mf.set_metric(from_vec!(vec![m]));
            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
            assert!(format!("{}", res.unwrap_err()).contains(case.1));
        }
    }
}