|
@@ -1,205 +0,0 @@
|
|
|
-#!/usr/bin/env python
|
|
|
-import optparse
|
|
|
-import os
|
|
|
-import sys
|
|
|
-import zipfile
|
|
|
-
|
|
|
-"""
|
|
|
-Take a MaxMind GeoLite Country database as input and replace A1 entries
|
|
|
-with the country code and name of the preceding entry iff the preceding
|
|
|
-(subsequent) entry ends (starts) directly before (after) the A1 entry and
|
|
|
-both preceding and subsequent entries contain the same country code.
|
|
|
-
|
|
|
-Then apply manual changes, either replacing A1 entries that could not be
|
|
|
-replaced automatically or overriding previously made automatic changes.
|
|
|
-"""
|
|
|
-
|
|
|
-def main():
|
|
|
- options = parse_options()
|
|
|
- assignments = read_file(options.in_maxmind)
|
|
|
- assignments = apply_automatic_changes(assignments)
|
|
|
- write_file(options.out_automatic, assignments)
|
|
|
- manual_assignments = read_file(options.in_manual, must_exist=False)
|
|
|
- assignments = apply_manual_changes(assignments, manual_assignments)
|
|
|
- write_file(options.out_manual, assignments)
|
|
|
- write_file(options.out_geoip, assignments, long_format=False)
|
|
|
-
|
|
|
-def parse_options():
|
|
|
- parser = optparse.OptionParser()
|
|
|
- parser.add_option('-i', action='store', dest='in_maxmind',
|
|
|
- default='GeoIPCountryCSV.zip', metavar='FILE',
|
|
|
- help='use the specified MaxMind GeoLite Country .zip or .csv '
|
|
|
- 'file as input [default: %default]')
|
|
|
- parser.add_option('-g', action='store', dest='in_manual',
|
|
|
- default='geoip-manual', metavar='FILE',
|
|
|
- help='use the specified .csv file for manual changes or to '
|
|
|
- 'override automatic changes [default: %default]')
|
|
|
- parser.add_option('-a', action='store', dest='out_automatic',
|
|
|
- default="AutomaticGeoIPCountryWhois.csv", metavar='FILE',
|
|
|
- help='write full input file plus automatic changes to the '
|
|
|
- 'specified .csv file [default: %default]')
|
|
|
- parser.add_option('-m', action='store', dest='out_manual',
|
|
|
- default='ManualGeoIPCountryWhois.csv', metavar='FILE',
|
|
|
- help='write full input file plus automatic and manual '
|
|
|
- 'changes to the specified .csv file [default: %default]')
|
|
|
- parser.add_option('-o', action='store', dest='out_geoip',
|
|
|
- default='geoip', metavar='FILE',
|
|
|
- help='write full input file plus automatic and manual '
|
|
|
- 'changes to the specified .csv file that can be shipped '
|
|
|
- 'with tor [default: %default]')
|
|
|
- (options, args) = parser.parse_args()
|
|
|
- return options
|
|
|
-
|
|
|
-def read_file(path, must_exist=True):
|
|
|
- if not os.path.exists(path):
|
|
|
- if must_exist:
|
|
|
- print 'File %s does not exist. Exiting.' % (path, )
|
|
|
- sys.exit(1)
|
|
|
- else:
|
|
|
- return
|
|
|
- if path.endswith('.zip'):
|
|
|
- zip_file = zipfile.ZipFile(path)
|
|
|
- csv_content = zip_file.read('GeoIPCountryWhois.csv')
|
|
|
- zip_file.close()
|
|
|
- else:
|
|
|
- csv_file = open(path)
|
|
|
- csv_content = csv_file.read()
|
|
|
- csv_file.close()
|
|
|
- assignments = []
|
|
|
- for line in csv_content.split('\n'):
|
|
|
- stripped_line = line.strip()
|
|
|
- if len(stripped_line) > 0 and not stripped_line.startswith('#'):
|
|
|
- assignments.append(stripped_line)
|
|
|
- return assignments
|
|
|
-
|
|
|
-def apply_automatic_changes(assignments):
|
|
|
- print '\nApplying automatic changes...'
|
|
|
- result_lines = []
|
|
|
- prev_line = None
|
|
|
- a1_lines = []
|
|
|
- for line in assignments:
|
|
|
- if '"A1"' in line:
|
|
|
- a1_lines.append(line)
|
|
|
- else:
|
|
|
- if len(a1_lines) > 0:
|
|
|
- new_a1_lines = process_a1_lines(prev_line, a1_lines, line)
|
|
|
- for new_a1_line in new_a1_lines:
|
|
|
- result_lines.append(new_a1_line)
|
|
|
- a1_lines = []
|
|
|
- result_lines.append(line)
|
|
|
- prev_line = line
|
|
|
- if len(a1_lines) > 0:
|
|
|
- new_a1_lines = process_a1_lines(prev_line, a1_lines, None)
|
|
|
- for new_a1_line in new_a1_lines:
|
|
|
- result_lines.append(new_a1_line)
|
|
|
- return result_lines
|
|
|
-
|
|
|
-def process_a1_lines(prev_line, a1_lines, next_line):
|
|
|
- if not prev_line or not next_line:
|
|
|
- return a1_lines # Can't merge first or last line in file.
|
|
|
- if len(a1_lines) > 1:
|
|
|
- return a1_lines # Can't merge more than 1 line at once.
|
|
|
- a1_line = a1_lines[0].strip()
|
|
|
- prev_entry = parse_line(prev_line)
|
|
|
- a1_entry = parse_line(a1_line)
|
|
|
- next_entry = parse_line(next_line)
|
|
|
- touches_prev_entry = int(prev_entry['end_num']) + 1 == \
|
|
|
- int(a1_entry['start_num'])
|
|
|
- touches_next_entry = int(a1_entry['end_num']) + 1 == \
|
|
|
- int(next_entry['start_num'])
|
|
|
- same_country_code = prev_entry['country_code'] == \
|
|
|
- next_entry['country_code']
|
|
|
- if touches_prev_entry and touches_next_entry and same_country_code:
|
|
|
- new_line = format_line_with_other_country(a1_entry, prev_entry)
|
|
|
- print '-%s\n+%s' % (a1_line, new_line, )
|
|
|
- return [new_line]
|
|
|
- else:
|
|
|
- return a1_lines
|
|
|
-
|
|
|
-def parse_line(line):
|
|
|
- if not line:
|
|
|
- return None
|
|
|
- keys = ['start_str', 'end_str', 'start_num', 'end_num',
|
|
|
- 'country_code', 'country_name']
|
|
|
- stripped_line = line.replace('"', '').strip()
|
|
|
- parts = stripped_line.split(',')
|
|
|
- entry = dict((k, v) for k, v in zip(keys, parts))
|
|
|
- return entry
|
|
|
-
|
|
|
-def format_line_with_other_country(original_entry, other_entry):
|
|
|
- return '"%s","%s","%s","%s","%s","%s"' % (original_entry['start_str'],
|
|
|
- original_entry['end_str'], original_entry['start_num'],
|
|
|
- original_entry['end_num'], other_entry['country_code'],
|
|
|
- other_entry['country_name'], )
|
|
|
-
|
|
|
-def apply_manual_changes(assignments, manual_assignments):
|
|
|
- if not manual_assignments:
|
|
|
- return assignments
|
|
|
- print '\nApplying manual changes...'
|
|
|
- manual_dict = {}
|
|
|
- for line in manual_assignments:
|
|
|
- start_num = parse_line(line)['start_num']
|
|
|
- if start_num in manual_dict:
|
|
|
- print ('Warning: duplicate start number in manual '
|
|
|
- 'assignments:\n %s\n %s\nDiscarding first entry.' %
|
|
|
- (manual_dict[start_num], line, ))
|
|
|
- manual_dict[start_num] = line
|
|
|
- result = []
|
|
|
- for line in assignments:
|
|
|
- entry = parse_line(line)
|
|
|
- start_num = entry['start_num']
|
|
|
- if start_num in manual_dict:
|
|
|
- manual_line = manual_dict[start_num]
|
|
|
- manual_entry = parse_line(manual_line)
|
|
|
- if entry['start_str'] == manual_entry['start_str'] and \
|
|
|
- entry['end_str'] == manual_entry['end_str'] and \
|
|
|
- entry['end_num'] == manual_entry['end_num']:
|
|
|
- if len(manual_entry['country_code']) != 2:
|
|
|
- print '-%s' % (line, ) # only remove, don't replace
|
|
|
- del manual_dict[start_num]
|
|
|
- elif entry['country_code'] != \
|
|
|
- manual_entry['country_code']:
|
|
|
- new_line = format_line_with_other_country(entry,
|
|
|
- manual_entry)
|
|
|
- print '-%s\n+%s' % (line, new_line, )
|
|
|
- result.append(new_line)
|
|
|
- del manual_dict[start_num]
|
|
|
- else:
|
|
|
- print ('Warning: not applying ineffective manual '
|
|
|
- 'change:\n %s\n %s' % (line, manual_line, ))
|
|
|
- result.append(line)
|
|
|
- else:
|
|
|
- print ('Warning: not applying manual change that is only '
|
|
|
- 'a partial match:\n %s\n %s' %
|
|
|
- (line, manual_line, ))
|
|
|
- result.append(line)
|
|
|
- elif 'country_code' in entry and \
|
|
|
- entry['country_code'] == 'A1':
|
|
|
- print ('Warning: no manual replacement for A1 entry:\n %s'
|
|
|
- % (line, ))
|
|
|
- result.append(line)
|
|
|
- else:
|
|
|
- result.append(line)
|
|
|
- if len(manual_dict) > 0:
|
|
|
- print 'Warning: could not apply all manual assignments:'
|
|
|
- for line in manual_dict.values():
|
|
|
- print ' %s' % (line, )
|
|
|
- return result
|
|
|
-
|
|
|
-def write_file(path, assignments, long_format=True):
|
|
|
- if long_format:
|
|
|
- output_lines = assignments
|
|
|
- else:
|
|
|
- output_lines = []
|
|
|
- for long_line in assignments:
|
|
|
- entry = parse_line(long_line)
|
|
|
- short_line = "%s,%s,%s" % (entry['start_num'],
|
|
|
- entry['end_num'], entry['country_code'], )
|
|
|
- output_lines.append(short_line)
|
|
|
- out_file = open(path, 'w')
|
|
|
- out_file.write('\n'.join(output_lines))
|
|
|
- out_file.close()
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- main()
|
|
|
-
|