Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample rate to statsd to throttle overexuberant stats (#403) #404

Merged
merged 1 commit into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions src/filecache.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ static void get_fresh_fd(filecache_t *cache,
char old_filename[PATH_MAX];
const char *sz;
bool unlink_old = false;
float samplerate = 1.0; // always sample stat

if (pdata == NULL) {
*pdatap = calloc(1, sizeof(struct filecache_pdata));
Expand Down Expand Up @@ -654,7 +655,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_xlg_timing);
count = FETCH(filecache_get_xlg_count);
sz = "XLG";
stats_counter("large-gets", 1);
stats_counter("large-gets", 1, samplerate);
stats_timer("large-get-latency", elapsed_time);
}
else if (st.st_size > LG) {
Expand All @@ -663,7 +664,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_lg_timing);
count = FETCH(filecache_get_lg_count);
sz = "LG";
stats_counter("large-gets", 1);
stats_counter("large-gets", 1, samplerate);
stats_timer("large-get-latency", elapsed_time);
}
else if (st.st_size > MED) {
Expand All @@ -672,7 +673,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_med_timing);
count = FETCH(filecache_get_med_count);
sz = "MED";
stats_counter("large-gets", 1);
stats_counter("large-gets", 1, samplerate);
stats_timer("large-get-latency", elapsed_time);
}
else if (st.st_size > SM) {
Expand All @@ -681,7 +682,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_sm_timing);
count = FETCH(filecache_get_sm_count);
sz = "SM";
stats_counter("small-gets", 1);
stats_counter("small-gets", 1, samplerate);
stats_timer("small-get-latency", elapsed_time);
}
else if (st.st_size > XSM) {
Expand All @@ -690,7 +691,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_xsm_timing);
count = FETCH(filecache_get_xsm_count);
sz = "XSM";
stats_counter("small-gets", 1);
stats_counter("small-gets", 1, samplerate);
stats_timer("small-get-latency", elapsed_time);
}
else {
Expand All @@ -699,7 +700,7 @@ static void get_fresh_fd(filecache_t *cache,
latency = FETCH(filecache_get_xxsm_timing);
count = FETCH(filecache_get_xxsm_count);
sz = "XXSM";
stats_counter("small-gets", 1);
stats_counter("small-gets", 1, samplerate);
stats_timer("small-get-latency", elapsed_time);
}
log_print(LOG_DEBUG, SECTION_FILECACHE_OPEN, "put_fresh_fd: GET on size %s (%lu) for %s -- Current:Average latency %lu :: %lu",
Expand All @@ -708,13 +709,13 @@ static void get_fresh_fd(filecache_t *cache,
if (st.st_size >= LG && elapsed_time > large_time_allotment) {
log_print(LOG_WARNING, SECTION_FILECACHE_OPEN, "put_fresh_fd: large (%lu) GET for %s exceeded time allotment %lu with %lu",
st.st_size, path, large_time_allotment, elapsed_time);
stats_counter("exceeded-time-large-GET-count", 1);
stats_counter("exceeded-time-large-GET-count", 1, samplerate);
stats_timer("exceeded-time-large-GET-latency", elapsed_time);
}
else if (st.st_size < LG && elapsed_time > small_time_allotment) {
log_print(LOG_WARNING, SECTION_FILECACHE_OPEN, "put_fresh_fd: small (%lu) GET for %s exceeded time allotment %lu with %lu",
st.st_size, path, small_time_allotment, elapsed_time);
stats_counter("exceeded-time-small-GET-count", 1);
stats_counter("exceeded-time-small-GET-count", 1, samplerate);
stats_timer("exceeded-time-small-GET-latency", elapsed_time);
}
}
Expand Down Expand Up @@ -1015,8 +1016,9 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
CURLcode res = CURLE_OK;
// Not to exceed time for operation, else it's an error. Allow large files a longer time
// Somewhat arbitrary
static const unsigned small_time_allotment = 2000; // 2 seconds
static const unsigned small_time_allotment = 4000; // 4 seconds
static const unsigned large_time_allotment = 8000; // 8 seconds
float samplerate = 1.0; // always sample stats

BUMP(filecache_return_etag);

Expand Down Expand Up @@ -1140,7 +1142,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_xlg_timing);
count = FETCH(filecache_put_xlg_count);
sz = "XLG";
stats_counter("large-puts", 1);
stats_counter("large-puts", 1, samplerate);
stats_timer("large-put-latency", elapsed_time);
}
else if (st.st_size > LG) {
Expand All @@ -1149,7 +1151,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_lg_timing);
count = FETCH(filecache_put_lg_count);
sz = "LG";
stats_counter("large-puts", 1);
stats_counter("large-puts", 1, samplerate);
stats_timer("large-put-latency", elapsed_time);
}
else if (st.st_size > MED) {
Expand All @@ -1158,7 +1160,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_med_timing);
count = FETCH(filecache_put_med_count);
sz = "MED";
stats_counter("large-puts", 1);
stats_counter("large-puts", 1, samplerate);
stats_timer("large-put-latency", elapsed_time);
}
else if (st.st_size > SM) {
Expand All @@ -1167,7 +1169,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_sm_timing);
count = FETCH(filecache_put_sm_count);
sz = "SM";
stats_counter("small-puts", 1);
stats_counter("small-puts", 1, samplerate);
stats_timer("small-put-latency", elapsed_time);
}
else if (st.st_size > XSM) {
Expand All @@ -1176,7 +1178,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_xsm_timing);
count = FETCH(filecache_put_xsm_count);
sz = "XSM";
stats_counter("small-puts", 1);
stats_counter("small-puts", 1, samplerate);
stats_timer("small-put-latency", elapsed_time);
}
else {
Expand All @@ -1185,7 +1187,7 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
latency = FETCH(filecache_put_xxsm_timing);
count = FETCH(filecache_put_xxsm_count);
sz = "XXSM";
stats_counter("small-puts", 1);
stats_counter("small-puts", 1, samplerate);
stats_timer("small-put-latency", elapsed_time);
}
log_print(LOG_DEBUG, SECTION_FILECACHE_OPEN, "%s: PUT on size %s (%lu) for %s -- Current:Average latency %lu :: %lu",
Expand All @@ -1194,13 +1196,13 @@ static void put_return_etag(const char *path, int fd, char *etag, GError **gerr)
if (st.st_size >= LG && elapsed_time > large_time_allotment) {
log_print(LOG_WARNING, SECTION_FILECACHE_OPEN, "%s: large (%lu) PUT for %s exceeded time allotment %lu with %lu",
funcname, st.st_size, path, large_time_allotment, elapsed_time);
stats_counter("exceeded-time-large-PUT-count", 1);
stats_counter("exceeded-time-large-PUT-count", 1, samplerate);
stats_timer("exceeded-time-large-PUT-latency", elapsed_time);
}
else if (st.st_size < LG && elapsed_time > small_time_allotment) {
log_print(LOG_WARNING, SECTION_FILECACHE_OPEN, "%s: small (%lu) PUT for %s exceeded time allotment %lu with %lu",
funcname, st.st_size, path, small_time_allotment, elapsed_time);
stats_counter("exceeded-time-small-PUT-count", 1);
stats_counter("exceeded-time-small-PUT-count", 1, samplerate);
stats_timer("exceeded-time-small-PUT-latency", elapsed_time);
}
}
Expand Down
93 changes: 67 additions & 26 deletions src/fusedav-statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,20 @@ int stats_close(void) {
return 0;
}

/* Does it pass the sampling rate? */
static int sample(float samplerate) {
const char *funcname = "sample";
if (samplerate < 1.0) {
long int value = random();
float sample = ((float)value / RAND_MAX);
log_print(LOG_DEBUG, SECTION_STATS_DEFAULT, "%s: sample: %.2f, rate: %.2f, send: %d",
funcname, sample, samplerate, (samplerate > sample));
return samplerate > sample;
} else {
return 1;
}
}

/* Send the stat. */
static int stats_send(const char *statmsg) {
int bytes;
Expand Down Expand Up @@ -242,52 +256,77 @@ static int stats_send(const char *statmsg) {
return 0;
}

static int compose_message(const char *statname, const signed int value, char *type, char *msg, const char *cluster, const char *node) {
if (node) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.server-%s.%s:%d|%s\n",
server.stats_prefix, cluster, node, statname, value, type);
} else if (cluster) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.%s:%d|%s\n",
server.stats_prefix, cluster, statname, value, type);
} else {
return snprintf(msg, STATS_MSG_LEN, "%s.%s:%d|%s\n",
server.stats_prefix, statname, value, type);
static int compose_message(const char *statname, const signed int value, char *type, char *msg,
const char *cluster, const char *node, float samplerate) {

// If we don't pass the samplerate, return 0, which to the caller means
// either we aren't sampling, or the string generated by snprintf below had 0 bytes
if (!sample(samplerate)) {
return 0;
}


if (samplerate < 1.0) {
if (node) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.server-%s.%s:%d|%s|@%.2f\n",
server.stats_prefix, cluster, node, statname, value, type, samplerate);
} else if (cluster) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.%s:%d|%s|@%.2f\n",
server.stats_prefix, cluster, statname, value, type, samplerate);
} else {
return snprintf(msg, STATS_MSG_LEN, "%s.%s:%d|%s|@%.2f\n",
server.stats_prefix, statname, value, type, samplerate);
}

} else {
if (node) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.server-%s.%s:%d|%s\n",
server.stats_prefix, cluster, node, statname, value, type);
} else if (cluster) {
return snprintf(msg, STATS_MSG_LEN, "%s.%s.%s:%d|%s\n",
server.stats_prefix, cluster, statname, value, type);
} else {
return snprintf(msg, STATS_MSG_LEN, "%s.%s:%d|%s\n",
server.stats_prefix, statname, value, type);
}
}
}

static int stats_counter_common(const char *statname, const int value, const char *cluster, const char *node) {
static int stats_counter_common(const char *statname, const int value, const char *cluster, const char *node, float samplerate) {
int error;
int res;
char msg[STATS_MSG_LEN];
char type[] = "c";
res = compose_message(statname, value, type, msg, cluster, node);
if (res < 0) {
res = compose_message(statname, value, type, msg, cluster, node, samplerate);
// < 0 means an error; 0 means either no bytes, or didn't pass sampling rate
if (res <= 0) {
return res;
}
error = stats_send(msg);
return error;
}

int stats_counter(const char *statname, const int value) {
return stats_counter_common(statname, value, get_filesystem_cluster(), get_nodeaddr());
int stats_counter(const char *statname, const int value, float samplerate) {
return stats_counter_common(statname, value, get_filesystem_cluster(), get_nodeaddr(), samplerate);
}

int stats_counter_cluster(const char *statname, const int value) {
return stats_counter_common(statname, value, get_filesystem_cluster(), NULL);
int stats_counter_cluster(const char *statname, const int value, float samplerate) {
return stats_counter_common(statname, value, get_filesystem_cluster(), NULL, samplerate);
}

int stats_counter_local(const char *statname, const int value) {
return stats_counter_common(statname, value, NULL, NULL);
int stats_counter_local(const char *statname, const int value, float samplerate) {
return stats_counter_common(statname, value, NULL, NULL, samplerate);
}

static int stats_gauge_common(const char *statname, const int value, const char *cluster, const char *node) {
int error;
int res;
char msg[STATS_MSG_LEN];
char type[] = "g";
res = compose_message(statname, value, type, msg, cluster, node);
if (res < 0) {
float samplerate = 1.0; // No sampling
// < 0 means an error; 0 means either no bytes, or didn't pass sampling rate
res = compose_message(statname, value, type, msg, cluster, node, samplerate);
if (res <= 0) {
return res;
}
error = stats_send(msg);
Expand All @@ -311,8 +350,10 @@ static int stats_timer_common(const char *statname, const int value, const char
int res;
char msg[STATS_MSG_LEN];
char type[] = "ms";
res = compose_message(statname, value, type, msg, cluster, node);
if (res < 0) {
float samplerate = 1.0; // No sampling
// < 0 means an error; 0 means either no bytes, or didn't pass sampling rate
res = compose_message(statname, value, type, msg, cluster, node, samplerate);
if (res <= 0) {
return res;
}
error = stats_send(msg);
Expand All @@ -332,7 +373,7 @@ int stats_timer_local(const char *statname, const int value) {
}

// Not really a histogram, but keeps separate stats for each value
int stats_histo(const char *statname, const int value, const int max) {
int stats_histo(const char *statname, const int value, const int max, float samplerate) {
char *value_str = NULL;
char *attempts_str = NULL;
int ret;
Expand All @@ -343,10 +384,10 @@ int stats_histo(const char *statname, const int value, const int max) {
asprintf(&value_str, "gt_%d_%s", max, statname);
}
// Is this the first, second, or third failure for this request?
stats_counter(value_str, 1);
stats_counter(value_str, 1, samplerate);
free(value_str);
asprintf(&attempts_str, "%s_attempts", statname);
ret = stats_counter(attempts_str, 1);
ret = stats_counter(attempts_str, 1, samplerate);
free(attempts_str);
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/fusedav-statsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
***/

// Special sample rate for propfinds so as not to overwhelm statsd transmission
extern float pfsamplerate;

int stats_init(const char *domain, const char *port);
int stats_close(void);
int stats_counter(const char *statname, const int value);
int stats_counter_cluster(const char *statname, const int value);
int stats_counter_local(const char *statname, const int value);
int stats_counter(const char *statname, const int value, float samplerate);
int stats_counter_cluster(const char *statname, const int value, float samplerate);
int stats_counter_local(const char *statname, const int value, float samplerate);
int stats_gauge(const char *statname, const int value);
int stats_gauge_cluster(const char *statname, const int value);
int stats_gauge_local(const char *statname, const int value);
int stats_timer(const char *statname, const int value);
int stats_timer_cluster(const char *statname, const int value);
int stats_timer_local(const char *statname, const int value);
int stats_histo(const char *statname, const int value, const int max);
int stats_histo(const char *statname, const int value, const int max, float samplerate);

#endif
Loading