Skip to content

Commit

Permalink
Unmarshal JSON numbers as ints where possible
Browse files Browse the repository at this point in the history
The change modifies the unmarshaling logic to try parsing the numbers
in an int64 first, and only on error parse to a float64. In practice,
this means that e.g. 1 will become an int64 and 1.0 will become a
float64.

Fixes elastic#2038. Includes a system test to verify that ticket.
  • Loading branch information
Tudor Golubenco committed Jul 25, 2016
1 parent c5c51dd commit 382997f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 72 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Fix potential data loss between filebeat restarts, reporting unpublished lines as published. {issue}2041[2041]

- Fix potential data loss between filebeat restarts, reporting unpublished lines as published. {issue}2041[2041]
- Fix open file handler issue {issue}2028[2028] {pull}2020[2020]
- Fix filtering of JSON events when using integers in conditions {issue}2038[2038]

*Winlogbeat*
- Fix potential data loss between winlogbeat restarts, reporting unpublished lines as published. {issue}2041[2041]
Expand Down
60 changes: 58 additions & 2 deletions filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reader

import (
"bytes"
"encoding/json"
"fmt"

Expand All @@ -25,8 +26,9 @@ func NewJSON(r Reader, cfg *JSONConfig) *JSON {
// decodeJSON unmarshals the text parameter into a MapStr and
// returns the new text column if one was requested.
func (r *JSON) decodeJSON(text []byte) ([]byte, common.MapStr) {
var jsonFields common.MapStr
err := json.Unmarshal(text, &jsonFields)
var jsonFields map[string]interface{}

err := unmarshal(text, &jsonFields)
if err != nil {
logp.Err("Error decoding JSON: %v", err)
if r.cfg.AddErrorKey {
Expand Down Expand Up @@ -58,6 +60,60 @@ func (r *JSON) decodeJSON(text []byte) ([]byte, common.MapStr) {
return []byte(textString), jsonFields
}

// unmarshal is equivalent with json.Unmarshal but it converts numbers
// to int64 where possible, instead of using always float64.
func unmarshal(text []byte, fields *map[string]interface{}) error {
dec := json.NewDecoder(bytes.NewReader(text))
dec.UseNumber()
err := dec.Decode(fields)
if err != nil {
return err
}
transformNumbersDict(*fields)
return nil
}

// transformNumbersDict walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func transformNumbersDict(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (Message, error) {
reader, err := r.reader.Next()
Expand Down
128 changes: 59 additions & 69 deletions filebeat/harvester/reader/json_test.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,80 @@
// +build !integration

package reader

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestDecodeJSON(t *testing.T) {
func TestUnmarshal(t *testing.T) {
type io struct {
Text string
Config JSONConfig
ExpectedText string
ExpectedMap common.MapStr
Name string
Input string
Output map[string]interface{}
}

var tests = []io{
{
Text: `{"message": "test", "value": 1}`,
Config: JSONConfig{MessageKey: "message"},
ExpectedText: "test",
ExpectedMap: common.MapStr{"message": "test", "value": float64(1)},
},
{
Text: `{"message": "test", "value": 1}`,
Config: JSONConfig{MessageKey: "message1"},
ExpectedText: "",
ExpectedMap: common.MapStr{"message": "test", "value": float64(1)},
},
{
Text: `{"message": "test", "value": 1}`,
Config: JSONConfig{MessageKey: "value"},
ExpectedText: "",
ExpectedMap: common.MapStr{"message": "test", "value": float64(1)},
tests := []io{
io{
Name: "Top level int, float, string, bool",
Input: `{"a": 3, "b": 2.0, "c": "hello", "d": true}`,
Output: map[string]interface{}{
"a": int64(3),
"b": float64(2),
"c": "hello",
"d": true,
},
},
{
Text: `{"message": "test", "value": "1"}`,
Config: JSONConfig{MessageKey: "value"},
ExpectedText: "1",
ExpectedMap: common.MapStr{"message": "test", "value": "1"},
io{
Name: "Nested objects with ints",
Input: `{"a": 3, "b": {"c": {"d": 5}}}`,
Output: map[string]interface{}{
"a": int64(3),
"b": map[string]interface{}{
"c": map[string]interface{}{
"d": int64(5),
},
},
},
},
{
// in case of JSON decoding errors, the text is passed as is
Text: `{"message": "test", "value": "`,
Config: JSONConfig{MessageKey: "value"},
ExpectedText: `{"message": "test", "value": "`,
ExpectedMap: nil,
io{
Name: "Array of floats",
Input: `{"a": 3, "b": {"c": [4.0, 4.1, 4.2]}}`,
Output: map[string]interface{}{
"a": int64(3),
"b": map[string]interface{}{
"c": []interface{}{
float64(4.0), float64(4.1), float64(4.2),
},
},
},
},
{
// Add key error helps debugging this
Text: `{"message": "test", "value": "`,
Config: JSONConfig{MessageKey: "value", AddErrorKey: true},
ExpectedText: `{"message": "test", "value": "`,
ExpectedMap: common.MapStr{"json_error": "Error decoding JSON: unexpected end of JSON input"},
io{
Name: "Array of mixed ints and floats",
Input: `{"a": 3, "b": {"c": [4, 4.1, 4.2]}}`,
Output: map[string]interface{}{
"a": int64(3),
"b": map[string]interface{}{
"c": []interface{}{
int64(4), float64(4.1), float64(4.2),
},
},
},
},
{
// If the text key is not found, put an error
Text: `{"message": "test", "value": "1"}`,
Config: JSONConfig{MessageKey: "hello", AddErrorKey: true},
ExpectedText: ``,
ExpectedMap: common.MapStr{"message": "test", "value": "1", "json_error": "Key 'hello' not found"},
},
{
// If the text key is found, but not a string, put an error
Text: `{"message": "test", "value": 1}`,
Config: JSONConfig{MessageKey: "value", AddErrorKey: true},
ExpectedText: ``,
ExpectedMap: common.MapStr{"message": "test", "value": float64(1), "json_error": "Value of key 'value' is not a string"},
},
{
// Without a text key, simple return the json and an empty text
Text: `{"message": "test", "value": 1}`,
Config: JSONConfig{AddErrorKey: true},
ExpectedText: ``,
ExpectedMap: common.MapStr{"message": "test", "value": float64(1)},
io{
Name: "Negative values",
Input: `{"a": -3, "b": -1.0}`,
Output: map[string]interface{}{
"a": int64(-3),
"b": float64(-1),
},
},
}

for _, test := range tests {

var p JSON
p.cfg = &test.Config
text, map_ := p.decodeJSON([]byte(test.Text))
assert.Equal(t, test.ExpectedText, string(text))
assert.Equal(t, test.ExpectedMap, map_)
t.Logf("Running test %s", test.Name)
var output map[string]interface{}
err := unmarshal([]byte(test.Input), &output)
assert.NoError(t, err)
assert.Equal(t, test.Output, output)
}
}
2 changes: 2 additions & 0 deletions filebeat/tests/files/logs/json_int.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"http_user_agent": "ELB-HealthChecker/1.0", "status": 200}
{"http_user_agent": "ELB-HealthChecker/1.0", "status": 404}
31 changes: 31 additions & 0 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,34 @@ def test_with_generic_filtering_remove_headers(self):
assert "res" not in o
assert o["method"] == "GET"
assert o["message"] == "Sent response: "

def test_integer_condition(self):
"""
It should work to drop JSON event based on an integer
value by using a simple `equal` condition. See #2038.
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
keys_under_root=True,
),
processors=[{
"drop_event": {
"when": "equals.status: 200",
},
}]
)
os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/json_int.log"],
source_dir="../files",
target_dir="log")

proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()
assert len(output) == 1
assert output[0]["status"] == 404

0 comments on commit 382997f

Please sign in to comment.