1 | #!/usr/bin/env python |
2 | |
3 | """ |
4 | This is a generic fuzz testing tool, see --help for more information. |
5 | """ |
6 | |
7 | import os |
8 | import sys |
9 | import random |
10 | import subprocess |
11 | import itertools |
12 | |
13 | class TestGenerator: |
14 | def __init__(self, inputs, delete, insert, replace, |
15 | insert_strings, pick_input): |
16 | self.inputs = [(s, open(s).read()) for s in inputs] |
17 | |
18 | self.delete = bool(delete) |
19 | self.insert = bool(insert) |
20 | self.replace = bool(replace) |
21 | self.pick_input = bool(pick_input) |
22 | self.insert_strings = list(insert_strings) |
23 | |
24 | self.num_positions = sum([len(d) for _,d in self.inputs]) |
25 | self.num_insert_strings = len(insert_strings) |
26 | self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) |
27 | * self.num_positions) |
28 | self.num_tests += 1 |
29 | |
30 | if self.pick_input: |
31 | self.num_tests *= self.num_positions |
32 | |
33 | def position_to_source_index(self, position): |
34 | for i,(s,d) in enumerate(self.inputs): |
35 | n = len(d) |
36 | if position < n: |
37 | return (i,position) |
38 | position -= n |
39 | raise ValueError,'Invalid position.' |
40 | |
41 | def get_test(self, index): |
42 | assert 0 <= index < self.num_tests |
43 | |
44 | picked_position = None |
45 | if self.pick_input: |
46 | index,picked_position = divmod(index, self.num_positions) |
47 | picked_position = self.position_to_source_index(picked_position) |
48 | |
49 | if index == 0: |
50 | return ('nothing', None, None, picked_position) |
51 | |
52 | index -= 1 |
53 | index,position = divmod(index, self.num_positions) |
54 | position = self.position_to_source_index(position) |
55 | if self.delete: |
56 | if index == 0: |
57 | return ('delete', position, None, picked_position) |
58 | index -= 1 |
59 | |
60 | index,insert_index = divmod(index, self.num_insert_strings) |
61 | insert_str = self.insert_strings[insert_index] |
62 | if self.insert: |
63 | if index == 0: |
64 | return ('insert', position, insert_str, picked_position) |
65 | index -= 1 |
66 | |
67 | assert self.replace |
68 | assert index == 0 |
69 | return ('replace', position, insert_str, picked_position) |
70 | |
71 | class TestApplication: |
72 | def __init__(self, tg, test): |
73 | self.tg = tg |
74 | self.test = test |
75 | |
76 | def apply(self): |
77 | if self.test[0] == 'nothing': |
78 | pass |
79 | else: |
80 | i,j = self.test[1] |
81 | name,data = self.tg.inputs[i] |
82 | if self.test[0] == 'delete': |
83 | data = data[:j] + data[j+1:] |
84 | elif self.test[0] == 'insert': |
85 | data = data[:j] + self.test[2] + data[j:] |
86 | elif self.test[0] == 'replace': |
87 | data = data[:j] + self.test[2] + data[j+1:] |
88 | else: |
89 | raise ValueError,'Invalid test %r' % self.test |
90 | open(name,'wb').write(data) |
91 | |
92 | def revert(self): |
93 | if self.test[0] != 'nothing': |
94 | i,j = self.test[1] |
95 | name,data = self.tg.inputs[i] |
96 | open(name,'wb').write(data) |
97 | |
98 | def quote(str): |
99 | return '"' + str + '"' |
100 | |
101 | def run_one_test(test_application, index, input_files, args): |
102 | test = test_application.test |
103 | |
104 | # Interpolate arguments. |
105 | options = { 'index' : index, |
106 | 'inputs' : ' '.join(quote(f) for f in input_files) } |
107 | |
108 | # Add picked input interpolation arguments, if used. |
109 | if test[3] is not None: |
110 | pos = test[3][1] |
111 | options['picked_input'] = input_files[test[3][0]] |
112 | options['picked_input_pos'] = pos |
113 | # Compute the line and column. |
114 | file_data = test_application.tg.inputs[test[3][0]][1] |
115 | line = column = 1 |
116 | for i in range(pos): |
117 | c = file_data[i] |
118 | if c == '\n': |
119 | line += 1 |
120 | column = 1 |
121 | else: |
122 | column += 1 |
123 | options['picked_input_line'] = line |
124 | options['picked_input_col'] = column |
125 | |
126 | test_args = [a % options for a in args] |
127 | if opts.verbose: |
128 | print '%s: note: executing %r' % (sys.argv[0], test_args) |
129 | |
130 | stdout = None |
131 | stderr = None |
132 | if opts.log_dir: |
133 | stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) |
134 | stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) |
135 | stdout = open(stdout_log_path, 'wb') |
136 | stderr = open(stderr_log_path, 'wb') |
137 | else: |
138 | sys.stdout.flush() |
139 | p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) |
140 | p.communicate() |
141 | exit_code = p.wait() |
142 | |
143 | test_result = (exit_code == opts.expected_exit_code or |
144 | exit_code in opts.extra_exit_codes) |
145 | |
146 | if stdout is not None: |
147 | stdout.close() |
148 | stderr.close() |
149 | |
150 | # Remove the logs for passes, unless logging all results. |
151 | if not opts.log_all and test_result: |
152 | os.remove(stdout_log_path) |
153 | os.remove(stderr_log_path) |
154 | |
155 | if not test_result: |
156 | print 'FAIL: %d' % index |
157 | elif not opts.succinct: |
158 | print 'PASS: %d' % index |
159 | return test_result |
160 | |
161 | def main(): |
162 | global opts |
163 | from optparse import OptionParser, OptionGroup |
164 | parser = OptionParser("""%prog [options] ... test command args ... |
165 | |
166 | %prog is a tool for fuzzing inputs and testing them. |
167 | |
168 | The most basic usage is something like: |
169 | |
170 | $ %prog --file foo.txt ./test.sh |
171 | |
172 | which will run a default list of fuzzing strategies on the input. For each |
173 | fuzzed input, it will overwrite the input files (in place), run the test script, |
174 | then restore the files back to their original contents. |
175 | |
176 | NOTE: You should make sure you have a backup copy of your inputs, in case |
177 | something goes wrong!!! |
178 | |
179 | You can cause the fuzzing to not restore the original files with |
180 | '--no-revert'. Generally this is used with '--test <index>' to run one failing |
181 | test and then leave the fuzzed inputs in place to examine the failure. |
182 | |
183 | For each fuzzed input, %prog will run the test command given on the command |
184 | line. Each argument in the command is subject to string interpolation before |
185 | being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard |
186 | printf format, and VARIABLE is one of: |
187 | |
188 | 'index' - the test index being run |
189 | 'inputs' - the full list of test inputs |
190 | 'picked_input' - (with --pick-input) the selected input file |
191 | 'picked_input_pos' - (with --pick-input) the selected input position |
192 | 'picked_input_line' - (with --pick-input) the selected input line |
193 | 'picked_input_col' - (with --pick-input) the selected input column |
194 | |
195 | By default, the script will run forever continually picking new tests to |
196 | run. You can limit the number of tests that are run with '--max-tests <number>', |
197 | and you can run a particular test with '--test <index>'. |
198 | |
199 | You can specify '--stop-on-fail' to stop the script on the first failure |
200 | without reverting the changes. |
201 | |
202 | """) |
203 | parser.add_option("-v", "--verbose", help="Show more output", |
204 | action='store_true', dest="verbose", default=False) |
205 | parser.add_option("-s", "--succinct", help="Reduce amount of output", |
206 | action="store_true", dest="succinct", default=False) |
207 | |
208 | group = OptionGroup(parser, "Test Execution") |
209 | group.add_option("", "--expected-exit-code", help="Set expected exit code", |
210 | type=int, dest="expected_exit_code", |
211 | default=0) |
212 | group.add_option("", "--extra-exit-code", |
213 | help="Set additional expected exit code", |
214 | type=int, action="append", dest="extra_exit_codes", |
215 | default=[]) |
216 | group.add_option("", "--log-dir", |
217 | help="Capture test logs to an output directory", |
218 | type=str, dest="log_dir", |
219 | default=None) |
220 | group.add_option("", "--log-all", |
221 | help="Log all outputs (not just failures)", |
222 | action="store_true", dest="log_all", default=False) |
223 | parser.add_option_group(group) |
224 | |
225 | group = OptionGroup(parser, "Input Files") |
226 | group.add_option("", "--file", metavar="PATH", |
227 | help="Add an input file to fuzz", |
228 | type=str, action="append", dest="input_files", default=[]) |
229 | group.add_option("", "--filelist", metavar="LIST", |
230 | help="Add a list of inputs files to fuzz (one per line)", |
231 | type=str, action="append", dest="filelists", default=[]) |
232 | parser.add_option_group(group) |
233 | |
234 | group = OptionGroup(parser, "Fuzz Options") |
235 | group.add_option("", "--replacement-chars", dest="replacement_chars", |
236 | help="Characters to insert/replace", |
237 | default="0{}[]<>\;@#$^%& ") |
238 | group.add_option("", "--replacement-string", dest="replacement_strings", |
239 | action="append", help="Add a replacement string to use", |
240 | default=[]) |
241 | group.add_option("", "--replacement-list", dest="replacement_lists", |
242 | help="Add a list of replacement strings (one per line)", |
243 | action="append", default=[]) |
244 | group.add_option("", "--no-delete", help="Don't delete characters", |
245 | action='store_false', dest="enable_delete", default=True) |
246 | group.add_option("", "--no-insert", help="Don't insert strings", |
247 | action='store_false', dest="enable_insert", default=True) |
248 | group.add_option("", "--no-replace", help="Don't replace strings", |
249 | action='store_false', dest="enable_replace", default=True) |
250 | group.add_option("", "--no-revert", help="Don't revert changes", |
251 | action='store_false', dest="revert", default=True) |
252 | group.add_option("", "--stop-on-fail", help="Stop on first failure", |
253 | action='store_true', dest="stop_on_fail", default=False) |
254 | parser.add_option_group(group) |
255 | |
256 | group = OptionGroup(parser, "Test Selection") |
257 | group.add_option("", "--test", help="Run a particular test", |
258 | type=int, dest="test", default=None, metavar="INDEX") |
259 | group.add_option("", "--max-tests", help="Maximum number of tests", |
260 | type=int, dest="max_tests", default=None, metavar="COUNT") |
261 | group.add_option("", "--pick-input", |
262 | help="Randomly select an input byte as well as fuzzing", |
263 | action='store_true', dest="pick_input", default=False) |
264 | parser.add_option_group(group) |
265 | |
266 | parser.disable_interspersed_args() |
267 | |
268 | (opts, args) = parser.parse_args() |
269 | |
270 | if not args: |
271 | parser.error("Invalid number of arguments") |
272 | |
273 | # Collect the list of inputs. |
274 | input_files = list(opts.input_files) |
275 | for filelist in opts.filelists: |
276 | f = open(filelist) |
277 | try: |
278 | for ln in f: |
279 | ln = ln.strip() |
280 | if ln: |
281 | input_files.append(ln) |
282 | finally: |
283 | f.close() |
284 | input_files.sort() |
285 | |
286 | if not input_files: |
287 | parser.error("No input files!") |
288 | |
289 | print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) |
290 | |
291 | # Make sure the log directory exists if used. |
292 | if opts.log_dir: |
293 | if not os.path.exists(opts.log_dir): |
294 | try: |
295 | os.mkdir(opts.log_dir) |
296 | except OSError: |
297 | print "%s: error: log directory couldn't be created!" % ( |
298 | sys.argv[0],) |
299 | raise SystemExit,1 |
300 | |
301 | # Get the list if insert/replacement strings. |
302 | replacements = list(opts.replacement_chars) |
303 | replacements.extend(opts.replacement_strings) |
304 | for replacement_list in opts.replacement_lists: |
305 | f = open(replacement_list) |
306 | try: |
307 | for ln in f: |
308 | ln = ln[:-1] |
309 | if ln: |
310 | replacements.append(ln) |
311 | finally: |
312 | f.close() |
313 | |
314 | # Unique and order the replacement list. |
315 | replacements = list(set(replacements)) |
316 | replacements.sort() |
317 | |
318 | # Create the test generator. |
319 | tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, |
320 | opts.enable_replace, replacements, opts.pick_input) |
321 | |
322 | print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) |
323 | print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) |
324 | if opts.test is not None: |
325 | it = [opts.test] |
326 | elif opts.max_tests is not None: |
327 | it = itertools.imap(random.randrange, |
328 | itertools.repeat(tg.num_tests, opts.max_tests)) |
329 | else: |
330 | it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) |
331 | for test in it: |
332 | t = tg.get_test(test) |
333 | |
334 | if opts.verbose: |
335 | print '%s: note: running test %d: %r' % (sys.argv[0], test, t) |
336 | ta = TestApplication(tg, t) |
337 | try: |
338 | ta.apply() |
339 | test_result = run_one_test(ta, test, input_files, args) |
340 | if not test_result and opts.stop_on_fail: |
341 | opts.revert = False |
342 | sys.exit(1) |
343 | finally: |
344 | if opts.revert: |
345 | ta.revert() |
346 | |
347 | sys.stdout.flush() |
348 | |
349 | if __name__ == '__main__': |
350 | main() |
351 | |