#!/usr/bin/env python
#
# (c) 2016-2017, Area 1 Security
#
# Sample code demonstrating the Area 1 Security remote API.
# Version 2.0
#
#
from __future__ import print_function
from traceback import print_exc
from operator import or_
from urlparse import urljoin
from copy import deepcopy as clone
from json import dumps, loads
import os, sys, time, argparse
import datetime as dt
import csv
import urllib2
import base64
AREA1_ENDPOINT = 'https://papillon.area1security.com/'
INDICATOR_QUERY = 'indicators?since={since}&end={end}'
ACTOR_QUERY = 'actors'
TAG_RESPONSE_KEY = 'tags'
NON_CSV_KEYS = ('categories', 'tags', 'threat_categories', 'tag_histories', 'aliases')
DESIRED_ORDER = {'indicator': 0, 'name': 1, 'type': 2, 'item_name': 3, 'item_type': 4,
'threat_name': 5, 'description': 6, 'overall_confidence': 7, 'first_seen': 8, 'last_seen': 9}
TIMESTAMP_KEYS = ('first_detected', 'first_seen', 'last_seen')
def get_order(key):
return DESIRED_ORDER.get(key, len(DESIRED_ORDER))
def fetch(url, creds):
request = urllib2.Request(url)
request.add_header('Authorization', 'Basic {}'.format(creds))
try:
resp = urllib2.urlopen(request)
return loads(resp.read())
except urllib2.HTTPError as e:
print("Status Code: '{}'.".format(e.code), file=sys.stderr, end='')
if e.code == 401:
print(' Failed to Authenticate.', file=sys.stderr, end='')
print(file=sys.stderr)
sys.exit(1)
except Exception:
print_exc()
print("Unknown exception.", file = sys.stderr)
def indicator_extract(data):
return data.get('data', [ ])
def actor_extract(data):
return [ d for _, l in data.iteritems() for d in l.get('data', [ ]) ]
def indicator_to_dictionary(indicator, convert_timestamp = False):
tags = { tag['category'] : tag['value'] for tag in indicator.get(TAG_RESPONSE_KEY, []) }
result = clone(indicator)
result.update(tags)
if convert_timestamp:
for key in TIMESTAMP_KEYS:
if key in result:
result[key] = dt.datetime.strftime(dt.datetime.utcfromtimestamp(result[key] / 1000), '%Y-%m-%d %H:%M:%S')
return result
def dictionaries_to_json(dictionaries):
for line in dumps(dictionaries, indent=4).split('\n'):
yield line
def dictionaries_to_csv(dictionaries):
if not len(dictionaries): return
key_space = list(reduce(or_, [set(k for k in dictionary.keys() if k not in NON_CSV_KEYS) \
for dictionary in dictionaries]))
key_space.sort(key=get_order)
header = ','.join(key_space)
yield header
for dictionary in dictionaries:
try:
yield ','.join(map(str, [dictionary.get(key, '') for key in key_space]))
except UnicodeDecodeError:
yield ','.join(map(str, [dictionary.get(key, '').encode('ascii','replace') for key in key_space]))
except:
continue
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=
'Return all indicators or actor indicators from Area 1 Security. '
'A1S_USER_UUID and A1S_USER_PASSWORD must be set in environment variables.')
parser.add_argument('-s', '--since-date', help='Since date; yyyy-mm-dd. Default- today.', required=False)
parser.add_argument('-e', '--end-date', help='End date; yyyy-mm-dd. Default- today.', required=False)
parser.add_argument('-i', '--indicators', help='Indicators only.', action='store_true')
parser.add_argument('-a', '--actors', help='Actor Indicators only.', action='store_true')
parser.add_argument('-t', '--convertts', help='Convert Timestamps to Human Readable.', action='store_true')
parser.add_argument('-f', '--format', help='Output format (json or csv)', required=False)
args = parser.parse_args()
try:
credentials = base64.b64encode('{}:{}'.format(os.environ['A1S_USER_UUID'],os.environ['A1S_USER_PASSWORD']))
except Exception:
print('A1S_USER_UUID and A1S_USER_PASSWORD environment variables must be setup properly.')
sys.exit(1)
if (not args.indicators and not args.actors) or \
(args.indicators and args.actors):
print('Please select -i or -a.')
sys.exit(1)
api_query = INDICATOR_QUERY if args.indicators else ACTOR_QUERY
extract = indicator_extract if args.indicators else actor_extract
today = dt.date.today()
if args.since_date:
s = args.since_date
else:
s = str(today)
if args.end_date:
e = args.end_date
else:
e = str(today)
ts = time.mktime(dt.datetime.strptime(s, '%Y-%m-%d').timetuple())
since_date = int(ts)
ts = time.mktime(dt.datetime.strptime(e, '%Y-%m-%d').timetuple())
end_date = int(ts)
api_endpoint = '{}{}'.format(AREA1_ENDPOINT, api_query).format(since = since_date, end = end_date)
indicators = extract(fetch(api_endpoint, credentials))
dictionaries = [ indicator_to_dictionary(indicator, convert_timestamp = args.convertts) \
for indicator in indicators ]
formatter = {
'csv' : dictionaries_to_csv,
'json' : dictionaries_to_json
}.get(args.format, dictionaries_to_json)
for result in formatter(dictionaries):
try:
print(result)
except Exception:
exit(0)cloudflare/Area1API
Publicmirrored fromhttps://github.com/cloudflare/Area1API
a1s-api
151lines · modepreview