-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathjsonq
executable file
·161 lines (143 loc) · 5.63 KB
/
jsonq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/python
# Check out the README:
# https://github.com/edmund-huber/jsonq/blob/master/README.md
import copy
from optparse import OptionParser
import re
import sys
try:
# simplejson is typically faster than standard-library json
import simplejson as json
except ImportError:
import json
class FilteredJSONObject(object):
"""Represents an incrementally built query-filtered JSON object.
>>> f = FilteredJSONObject()
>>> f.add_map('narf'); f.add_list(); f.add_map('zoop'); f.add_val(1);
>>> f.get_result()
{'narf': [{'zoop': 1}]}
"""
def __init__(self):
# this is the object we're building
self.result = None
# next place to assign is always self.tip[self.last_subscript]
self.tip = None
self.last_subscript = None
def add_map(self, key):
"""Set the current object tip to be a dictionary key."""
if self.result is None:
self.result = self.tip = {}
self.last_subscript = key
else:
new_tip = {}
self.tip[self.last_subscript] = new_tip
self.tip = new_tip
self.last_subscript = key
return self
def add_list(self):
"""Set the current object tip to be an array of length 1."""
if self.result is None:
self.result = self.tip = [None]
self.last_subscript = 0
else:
new_tip = [None]
self.tip[self.last_subscript] = new_tip
self.tip = new_tip
self.last_subscript = 0
return self
def finish(self, val):
"""Set the current object tip to be an arbitrary value and
return the skeleton that we traversed. """
if self.result is None:
self.result = val
else:
self.tip[self.last_subscript] = val
return self.result
def parse_queries(query_strs):
"""Convert `query_str` (the command-line arguments) to parsed queries."""
# parse the given queries. Each parsed query is a list of
# tuples. The first element of each tuple, if not None, is a
# dictionary key. The second element " " " ", is an array index.
queries = []
for query_s in query_strs:
original_query_s = query_s
query = []
while True:
# dictionary keys may contain anything except ., [, or ], so [^.\[\]]+
# maybe allow these chars also when backslash-escaped? meh.
# array indices are digits, i.e., \d+
m = re.match(r'((?P<select>\.[^.\[\]]+)|(?P<index>\[\d+\])|(?P<star>\[\*\]))', query_s)
if m:
query_s = query_s[len(m.group(0)):]
if m.group('select'):
q = m.group('select')[1:]
i = None
elif m.group('index'):
q = None
i = int(m.group('index')[1:-1])
elif m.group('star'):
q = None
i = True
else:
assert False
query.append((q, i))
elif query_s == '':
break
else:
raise ValueError("Could not parse the query", original_query_s)
queries.append(query)
return queries
def query(queries, filter_data, newlines):
"""Run `queries` against the data on stdin."""
def q(query, tip, filtered):
if len(query) > 0:
s, i = query[0]
if s is not None and i is None:
# this query item is an dictionary key:
if type(tip) == dict:
if s in tip:
return q(query[1:], tip[s], copy.deepcopy(filtered).add_map(s))
else:
return []
else:
return []
elif s is None and i is not None:
if type(tip) == list:
if i is True:
# we're dealing with a 'return all values' query, .*
matches = []
for i in range(len(tip)):
matches.extend(q(query[1:], tip[i], copy.deepcopy(filtered).add_list()))
return matches
else:
# this query item is an array subscript
if len(tip) > i:
return q(query[1:], tip[i], copy.deepcopy(filtered).add_list())
else:
return []
else:
return []
else:
assert False
else:
return [(tip, filtered.finish(tip))]
for line in sys.stdin:
json_obj = json.loads(line)
for query in queries:
for value, filtered in q(query, copy.deepcopy(json_obj), FilteredJSONObject()):
if filter_data:
print json.dumps(filtered) + ('\n' if newlines else ' '),
else:
print json.dumps(value) + ('\n' if newlines else ' '),
print
def main():
parser = OptionParser(usage="%prog [options] queries")
parser.add_option('-n', '--newlines', dest='newlines', default=False, action='store_true',
help="Use a newline instead of a space to delimit results.")
parser.add_option('-f', '--filter', dest='filter', default=False, action='store_true',
help="Show the original object filtered by the query, not just the result.")
options, args = parser.parse_args()
queries = parse_queries(args)
return query(queries, options.filter, options.newlines)
if __name__ == '__main__':
sys.exit(main())