diff --git a/plugins/in_kmsg/in_kmsg.c b/plugins/in_kmsg/in_kmsg.c index b34735f5646..3999ec8ba9b 100644 --- a/plugins/in_kmsg/in_kmsg.c +++ b/plugins/in_kmsg/in_kmsg.c @@ -166,6 +166,8 @@ static inline int process_line(char *line, strncpy(msg, p, line_len); msg[line_len] = '\0'; + flb_input_buf_write_start(i_ins); + /* * Store the new data into the MessagePack buffer, * we handle this as a list of maps. @@ -174,26 +176,28 @@ static inline int process_line(char *line, msgpack_pack_uint64(&i_ins->mp_pck, ts); msgpack_pack_map(&i_ins->mp_pck, 5); - msgpack_pack_bin(&i_ins->mp_pck, 8); - msgpack_pack_bin_body(&i_ins->mp_pck, "priority", 8); + msgpack_pack_str(&i_ins->mp_pck, 8); + msgpack_pack_str_body(&i_ins->mp_pck, "priority", 8); msgpack_pack_char(&i_ins->mp_pck, priority); - msgpack_pack_bin(&i_ins->mp_pck, 8); - msgpack_pack_bin_body(&i_ins->mp_pck, "sequence", 8); + msgpack_pack_str(&i_ins->mp_pck, 8); + msgpack_pack_str_body(&i_ins->mp_pck, "sequence", 8); msgpack_pack_uint64(&i_ins->mp_pck, sequence); - msgpack_pack_bin(&i_ins->mp_pck, 3); - msgpack_pack_bin_body(&i_ins->mp_pck, "sec", 3); + msgpack_pack_str(&i_ins->mp_pck, 3); + msgpack_pack_str_body(&i_ins->mp_pck, "sec", 3); msgpack_pack_uint64(&i_ins->mp_pck, tv.tv_sec); - msgpack_pack_bin(&i_ins->mp_pck, 4); - msgpack_pack_bin_body(&i_ins->mp_pck, "usec", 4); + msgpack_pack_str(&i_ins->mp_pck, 4); + msgpack_pack_str_body(&i_ins->mp_pck, "usec", 4); msgpack_pack_uint64(&i_ins->mp_pck, tv.tv_usec); - msgpack_pack_bin(&i_ins->mp_pck, 3); - msgpack_pack_bin_body(&i_ins->mp_pck, "msg", 3); - msgpack_pack_bin(&i_ins->mp_pck, line_len); - msgpack_pack_bin_body(&i_ins->mp_pck, p, line_len); + msgpack_pack_str(&i_ins->mp_pck, 3); + msgpack_pack_str_body(&i_ins->mp_pck, "msg", 3); + msgpack_pack_str(&i_ins->mp_pck, line_len); + msgpack_pack_str_body(&i_ins->mp_pck, p, line_len); + + flb_input_buf_write_end(i_ins); flb_trace("[in_kmsg] pri=%i seq=%" PRIu64 " ts=%ld sec=%ld usec=%ld '%s'", priority,