I’ll make it very clear: racism isn’t something I stand for. Intolerance is quite possibly one of the greatest threats to modern society and one of the worst attributes a person can have. However, in the security world everyone has this preconceived notion that China is to blame for a bulk of the worlds internet borne threats.

Is this true? Well, following an incident I was tasked with recently, some of the lesser technically inclined persons involved wanted a bit of insight into exactly where our threats were/are coming from.

basic workings

Over the past couple of days I’ve been spending time here and there piecing together a script that:
  • Pulls down a selection of public ‘blocklists’ (lists of IP addresses that are known to be malicious, or are straight-up undesirable to connect to, e.g. Tor exit nodes). This can also be summarised by the main script to form a single blocklist that can be loaded into firewalls.
  • Ingests raw firewall logs or firewall summary logs (something all of our custom firewall OS’s produce daily), and pulls all non-local addresses out of them.
  • Cross-references the address list against the blocklists, finding connections that have been made to/from malicious IP’s, whether they were blocked and what their originating country is.

The scripts require a set directory structure, however I do plan to make this a little more flexible:

MaxMind’s GeoIP Lite database is used.

The ‘blocklists’ file reads as follows:

arbor http://atlas-public.ec2.arbor.net/public/ssh_attackers
autoshun http://www.autoshun.org/files/shunlist.csv
badguys http://www.t-arend.de/linux/badguys.txt
blacklisted http://www.infiltrated.net/blacklisted
danger http://danger.rulez.sk/projects/bruteforceblocker/blist.php
denyhost http://stats.denyhosts.net/stats.html
dshield http://www.dshield.org/ipsascii.html?limit=5000
emergingthreats http://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt
evilssh http://vmx.yourcmc.ru/BAD_HOSTS.IP4
geopsy http://www.geopsy.org/blacklist.html
haleys http://charles.the-haleys.org/ssh_dico_attack_hdeny_format.php/hostsdeny.txt
kolatzek http://robert.kolatzek.org/possible_botnet_ips.txt
maldom http://mirror1.malwaredomains.com/files/domains.txt
mdl http://www.malwaredomainlist.com/mdl.php?colsearch=All&quantity=All&search=
skygeo http://sky.geocities.jp/ro_hp_add/ro_hp_add_hosts.txt
sshbl http://www.sshbl.org/list.txt
torlist https://www.dan.me.uk/torlist/

For each entry the first string is the name of the blocklist (used in the reports and to name the file in the lists directory) and the second from where it will be retrieved.

log formats

As noted above, two log types are supported:
  • Log Summaries: These are a log format unique to my organisation. All firewall logs are shipped back to a log server once a day, and the log collection software produces a basic report for each firewall every, e.g. high drop counts, etc.

  • Raw Logs: Standard firewall logs.

scripts

The first script is used to update the blocklist files. It’s worth noting that this shouldn’t be run more than once an hour, as some of the sources only permit requests every 30-60 minutes.

#!/usr/bin/python
"""Fetch public blocklists to use as analysis source of firewall logs."""

# -*- coding: utf-8 -*-
import datetime
import os
import re
import urllib2

PROXY = urllib2.ProxyHandler({'http': 'http://xxx.xxx.xxx:8080'})
OPENER = urllib2.build_opener(PROXY)
urllib2.install_opener(OPENER)

LIST_PATH = os.path.abspath('lists')

def write_dshield(response):
"""Fix up messy dshield file."""
print str(datetime.datetime.now()) + ' | Fixing up DShield addressing...'
tmp_file_path = os.path.join(LIST_PATH, 'dshield_tmp')
out_file_path = os.path.join(LIST_PATH, 'dshield')
with open(tmp_file_path, 'w') as temp_file:
temp_file.write(response)
with open(out_file_path, 'wt') as file_out:
with open(tmp_file_path, 'rt') as file_in:
for line in file_in:
line = re.sub(r'\.0*', '.', line)
line = re.sub(r'^0*', '', line)
file_out.write(line)
os.remove(tmp_file_path)

def wget(blocklist, url):
"""Pull blocklist file down via HTTP GET."""
print str(datetime.datetime.now()) + ' | Fetching: ' + url
response = urllib2.urlopen(url).read()
if blocklist == 'dshield':
write_dshield(response)
else:
bl_file_path = os.path.join(LIST_PATH, blocklist)
with open(bl_file_path, 'w') as block_file:
block_file.write(response)

print str(datetime.datetime.now()) + ' | Beginning update...'

with open('blocklists', 'r') as blocklist_list:
for target in blocklist_list:
bl_name, bl_url = target.split()
wget(bl_name, bl_url)

print str(datetime.datetime.now()) + ' | Update complete.'

The main script parses the logs and analyses them:

#!/usr/bin/python
"""Import firewall logs and firewall summaries, cross-referencing them against public
blocklists to determine malicious traffic patterns."""

# -*- coding: utf-8 -*-
import datetime
import mmap
import os
import pygeoip
import re

print 'Mode? (default = full)'
print 'full - Perform full analysis of ingress and egress traffic.'
print 'egress - Perform detailed analysis of egress traffic.'
print 'ingress - Perform detailed analysis of ingress traffic.'
print 'list - Produce blocklist.'
print '?>'
MODE = raw_input()
print ''

REPORT_FILE = 'bad_list.csv'
IP_FILE = 'ip_list.csv'
TMP_FILE = 'intel.tmp'
BLOCK_FILE = 'block_list.txt'

def build_blocklist():
"""Derive a single, firewall-digestable list of known bad IP's."""
ip_list = []

print str(datetime.datetime.now()) + ' | Ingesting blocklists...'

for root, dirs, files in os.walk(os.path.abspath('lists')):
for blocklist in files:
file_name = os.path.join(root, blocklist)
print str(datetime.datetime.now()) + ' | ' + file_name
logdata = open(file_name).readlines()
for line in logdata:
search = re.match(r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}' \
'([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', line)
if search:
ip_addr = search.group().strip()
ip_list.append(ip_addr)

out_path = os.path.abspath('out')
file_out = os.path.join(out_path, BLOCK_FILE)
with open(file_out, 'wt') as out_file:
ip_list = list(set(ip_list))
for item in ip_list:
out_file.write('%s\n' % item)

print str(datetime.datetime.now()) + ' | Blocklist written to: ' + file_out

def import_logs():
"""Import all logs in the logs directory for analysis."""
out_path = os.path.abspath('out')
tmp_file_path = os.path.join(out_path, TMP_FILE)

with open(tmp_file_path, 'wt') as out_file:

def add_ip(ip_addr, status, fw_name):
"""Validate and add a single IP event to the list."""
if re.match(r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]' \
'|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', ip_addr):
if not ip_addr.startswith('XXX.XX') and not ip_addr.startswith('XXX.XX'):
entry = ip_addr + ',' + status + ',' + fw_name
out_file.write('%s\n' % entry)
else:
print str(datetime.datetime.now()) + ' | Invalid IP: ' + ip_addr

def add_conn(src, dst, dpt, proto, fw_name):
"""Validate and add an egress connection event to the list."""
if re.match(r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]' \
'|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', src):
if re.match(r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]' \
'|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$', dst):
if (MODE == 'egress'):
if not dst.startswith('XXX.XX') and not dst.startswith('XXX.XX'):
entry = src + ',' + dst + ',' + dpt + ',' + proto + ',' + fw_name
out_file.write('%s\n' % entry)
if (MODE == 'ingress'):
if dst.startswith('XXX.XX') or dst.startswith('XXX.XX'):
entry = src + ',' + dst + ',' + dpt + ',' + proto + ',' + fw_name
out_file.write('%s\n' % entry)
else:
print str(datetime.datetime.now()) + ' | Invalid IP: ' + dst
else:
print str(datetime.datetime.now()) + ' | Invalid IP: ' + src

for root, dirs, files in os.walk(os.path.abspath('logs')):
for log in files:
log_file_path = os.path.join(root, log)
print str(datetime.datetime.now()) + ' | Parsing logfile: ' + log_file_path

if log.endswith('.msgsummary'):
if (MODE != 'egress') and (MODE != 'ingress'):
fw_name = log.split('_', 1)[0].replace('.', '')
with open(log_file_path) as log_file:
for line in log_file:
if line.strip().startswith('ICT') or line.strip().startswith('IDD') or line.strip().startswith('XCT') or line.strip().startswith('XDD'):
result = re.search(r'\b(?:[0-9\s]{1,3}\.){3}[0-9]{1,3}\b', line)
if result:
ip_addr = result.group().replace(' ', '')
ip_addr = ip_addr.replace('..', '.0.')
add_ip(ip_addr, 'BLOCKED', fw_name)
else:
print str(datetime.datetime.now()) + ' | Import mode incompatiable with summary logs.'
else:
with open(log_file_path, 'r+b') as log_file:

log_map = mmap.mmap(log_file.fileno(), 0, access=mmap.ACCESS_READ)
while True:
line = log_map.readline()
if line == '':
break
entry = line.split()
if len(entry) >= 10:
src_result = re.search(r'SRC=(.*?)\s', line)
dst_result = re.search(r'DST=(.*?)\s', line)
dpt_result = re.search(r'DPT=(.*?)\s', line)
proto_result = re.search(r'PROTO=(.*?)\s', line)

if src_result and dst_result and dpt_result and proto_result:
proto = proto_result.group().replace('PROTO=', '').strip()

if proto != 'ICMP':
fw_name = entry[3]
msg_type = entry[5]
src_status = ''
dst_status = ''
if msg_type == 'FW-NEW':
src_status = 'OK-SRC'
dst_status = 'OK-DST'
if (msg_type == 'FW-ICT') or (msg_type == 'FW-IDD') or (msg_type == 'FW-XCT') or (msg_type == 'FW-XDD'):
src_status = 'BLOCKED'
dst_status = 'BLOCKED'
if (src_status != '') and (dst_status != ''):
src = src_result.group().replace('SRC=', '').strip()
dst = dst_result.group().replace('DST=', '').strip()
dpt = dpt_result.group().replace('DPT=', '').strip()
if (MODE == 'egress') and (src_status != 'BLOCKED') and (dst_status != 'BLOCKED'):
add_conn(src, dst, dpt, proto, fw_name)
if (MODE == 'ingress') and (src_status != 'BLOCKED') and (dst_status != 'BLOCKED'):
add_conn(src, dst, dpt, proto, fw_name)
elif (MODE != 'egress') and (MODE != 'ingress'):
add_ip(src, src_status, fw_name)
add_ip(dst, dst_status, fw_name)

print str(datetime.datetime.now()) + ' | Completed parsing file.'

file_out = os.path.join(out_path, IP_FILE)

seen = set()
with open(tmp_file_path, 'r') as in_file:
with open(file_out, 'w') as out_file:
for line in in_file:
if line not in seen:
out_file.write(line)
seen.add(line)

print str(datetime.datetime.now()) + ' | IP list written to: ' + file_out

def analyse_logs():
"""Cross reference all firewall traffic against the downloaded blocklists."""
dat_path = os.path.abspath('dat')
dat_file = os.path.join(dat_path, 'GeoIP.dat')
geo_data = pygeoip.GeoIP(dat_file)

print str(datetime.datetime.now()) + ' | Beginning analysis...'

out_path = os.path.abspath('out')
tmp_file_path = os.path.join(out_path, TMP_FILE)

with open(tmp_file_path, 'wt') as out_file:
for root, dirs, files in os.walk(os.path.abspath('lists')):
for blocklist in files:
block_file = os.path.join(root, blocklist)
print str(datetime.datetime.now()) + ' | Cross referencing against: ' + block_file
blockdata = open(block_file, 'r').read()
out_path = os.path.abspath('out')
ip_csv = os.path.join(out_path, IP_FILE)
with open(ip_csv, 'r') as ip_list:
for entry in ip_list:
if (MODE == 'egress') or (MODE == 'ingress'):
src, dst, dpt, proto, fw_name = entry.split(',')
src = src.strip()
dst = dst.strip()
dpt = dpt.strip()
fw_name = fw_name.strip()
if MODE == 'egress':
if blockdata.find(dst) != -1:
country = (geo_data.country_name_by_addr(dst)).strip()
entry = src + ',' + dst + ',' + dpt + ',' + proto + ',' + fw_name + ',' + country + ',' + blocklist
print str(datetime.datetime.now()) + ' | Found: ' + src + ' going to: ' + dst + ':' + dpt + ' ' + proto + ' (' + country + ') in file: ' + blocklist
out_file.write('%s\n' % entry)
if MODE == 'ingress':
if blockdata.find(src) != -1:
country = (geo_data.country_name_by_addr(src)).strip()
entry = src + ',' + dst + ',' + dpt + ',' + proto + ',' + fw_name + ',' + country + ',' + blocklist
print str(datetime.datetime.now()) + ' | Found: ' + src + ' going to: ' + dst + ':' + dpt + ' ' + proto + ' (' + country + ') in file: ' + blocklist
out_file.write('%s\n' % entry)
if MODE == 'full':
ip_addr, status, fw_name = entry.split(',')
ip_addr = ip_addr.strip()
fw_name = fw_name.strip()
status = status.strip()
if blockdata.find(ip_addr) != -1:
country = (geo_data.country_name_by_addr(ip_addr)).strip()
entry = ip_addr + ',' + fw_name + ',' + status + ',' + country + ',' + blocklist
print str(datetime.datetime.now()) + ' | Found: ' + ip_addr + ' (' + country + ') in file: ' + blocklist + ' (' + status + ')'
out_file.write('%s\n' % entry)

file_out = os.path.join(out_path, REPORT_FILE)

seen = set()
with open(tmp_file_path, 'r') as in_file:
with open(file_out, 'w') as out_file:
for line in in_file:
if line not in seen:
out_file.write(line)
seen.add(line)
print str(datetime.datetime.now()) + ' | Report written to: ' + file_out

if MODE == 'list':
build_blocklist()
elif (MODE == 'egress') or (MODE == 'ingress') or (MODE == 'full'):
import_logs()
analyse_logs()
else:
print 'Invalid option.'

Inside of the import function are two checks that exclude local addresses, so you’ll need to adjust this depending on your local addressing (in our case we have two class B’s).

sample run

Database is updated:

A summarised blocklist is formed, later loaded into firewalls to ensure all addresses are blocked:

A full run is made, producing a report that details blocked connections (ICT, IDD, XCT and XDD), and incoming and outgoing successful ones (FW-NEW):

Using the resulting CSV a PowerView graph is made in Excel:


the firewall names are used as a filter (right-hand list)

… confirming that China is behind a lot of malicious traffic, but most of it is blocked. This is also making the assumption that a good deal of US traffic is VPN traffic. Using the results I found the cases in which connections did successfully make it through the firewall were to VoIP servers, which by design require all ports to be open to the internet – but the VoIP server itself rejects the connections (somewhat undesirably replying with an ICMP type 3).

Successful outgoing connections can potentially be a real point of concern, as they may indicate a compromised machine inside the network is making calls out to a controlling server. For this, there’s the ingress analysis mode:

the future?

This is only a small, temporary part of an ongoing project using Hadoop, Flume, Hive and Pig to provide deep insight and intelligent analysis of real-time, big data provided by both network devices and servers. Some time in the near future I’m hopeful to be able to share more about that.


3mil lines loaded in 3 minutes on a cheap, shared SSD

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s