-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconvert.py
167 lines (134 loc) · 5.23 KB
/
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Convert Sentencing Commission files into CSVs.
Usage:
$ python3 convert.py [LIST OF FILES TO CONVERT]
@author Kevin H. Wilson <[email protected]>
"""
import csv
import os
import sys
import tempfile
import zipfile
from pathlib import Path
from typing import Dict, List, Union
import click
def read_columns(filename: Union[str, Path]) -> List[Dict[str, Union[bool, int, str]]]:
"""
Read the column names from the file with the passed name.
Assumes it's in a SAS format that begins with INPUT and ends with
a semicolon.
Args:
filename: The file name to read
Returns:
The list of column names where each element is a dict with key/vals:
* name (str): The column name
* is_char (bool): Whether the column is a string column
* start (int): The (1-delimited) starting position of the column
* end (int): The (1-delimited and inclusive) ending position of the column
"""
columns = []
with open(filename, "rt") as f:
# Search for the line that starts with INPUT
for line in f:
if line.startswith("INPUT"):
break
for line in f:
# Kill all the extra whitespace
line = line.strip()
# Is this the last line?
if line.endswith(";"):
# If so, strip the ; and the extra whitespace
last_line = True
line = line[:-1].strip()
else:
last_line = False
# Parse row into column names
i = 0
sline = line.split()
while i < len(sline):
col_name = sline[i]
i += 1
if sline[i] == "$":
is_char = True
i += 1
else:
is_char = False
field_range = sline[i]
i += 1
# Field ranges are formatted either as # or #-#
sfield_range = field_range.split("-")
if len(sfield_range) == 1:
sfield_range = (sfield_range[0], sfield_range[0])
# Write out the column to the list
columns.append(
{
"name": col_name,
"is_char": is_char,
"start": int(sfield_range[0]),
"end": int(sfield_range[1]),
}
)
if last_line:
break
return columns
def convert_file(filename: Union[str, Path]):
"""
Convert a file from the Sentencing Commission format into a CSV.
Assumes the file is a ZIP file containing at least the following:
- .sas: A file with the same name as `filename` except ending in .sas
- .dat: A file with the same name as `filename` except ending in .dat
The .dat file is a fixed-width file whose columns are described by the .sas
file. If you're looking at the .sas file, search for INPUT and LENGTH to
see the two main parts of the file. There are a _lot_ of columns.
Args:
filename: The name of the file to convert
"""
filename = Path(filename)
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Unzip the contents of the file
with zipfile.ZipFile(filename, "r") as thefile:
thefile.extractall(tmpdir)
# Read in the column names from the .sas file
sasfilename = filename.with_suffix(".sas").name
saspath = tmpdir / sasfilename
columns = read_columns(saspath)
# Setup the path to the .dat file
datfilename = filename.with_suffix(".dat").name
datpath = tmpdir / datfilename
# Open the output file
outfilename = filename.with_suffix(".csv")
badlines = []
with open(outfilename, "wt") as outfile:
# Write the column headers
writer = csv.writer(outfile)
writer.writerow([col["name"] for col in columns])
# Read in the data
with click.progressbar(length=os.stat(datpath).st_size) as bar:
with open(datpath, "rb") as infile:
for line in infile:
bar.update(len(line))
line = line.decode("latin1")
# Read in a single row
readrow = []
for col in columns:
val = line[col["start"] - 1 : col["end"]].strip()
# If it's numeric and not missing, format it nicely
if val and not col["is_char"]:
if "." in val:
val = float(val)
else:
val = int(float(val)) # Handle 6e+10
readrow.append(val)
# Write out the row
writer.writerow(readrow)
if badlines:
badfilename = filename.with_suffix(".bad")
with open(badfilename, "wb") as f:
for line in badlines:
f.write(line)
def main():
for filename in sys.argv[1:]:
convert_file(filename)
if __name__ == "__main__":
main()