CodeCommitsIssuesPull requestsActionsInsightsSecurity
master

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS
SSH

Download ZIP

a1s-api

151lines · modepreview

#!/usr/bin/env python

#
# (c) 2016-2017, Area 1 Security
#
# Sample code demonstrating the Area 1 Security remote API.
# Version 2.0
#
#

from __future__ import print_function
from traceback  import print_exc
from operator   import or_
from urlparse   import urljoin
from copy       import deepcopy as clone
from json       import dumps, loads

import os, sys, time, argparse
import datetime as dt
import csv
import urllib2
import base64

AREA1_ENDPOINT   = 'https://papillon.area1security.com/'
INDICATOR_QUERY  = 'indicators?since={since}&end={end}'
ACTOR_QUERY      = 'actors'
TAG_RESPONSE_KEY = 'tags'
NON_CSV_KEYS     = ('categories', 'tags', 'threat_categories', 'tag_histories', 'aliases')
DESIRED_ORDER    = {'indicator': 0, 'name': 1, 'type': 2, 'item_name': 3, 'item_type': 4,
                    'threat_name': 5, 'description': 6, 'overall_confidence': 7, 'first_seen': 8, 'last_seen': 9}
TIMESTAMP_KEYS   = ('first_detected', 'first_seen', 'last_seen')


def get_order(key):
    return DESIRED_ORDER.get(key, len(DESIRED_ORDER))


def fetch(url, creds):
    request = urllib2.Request(url)
    request.add_header('Authorization', 'Basic {}'.format(creds))

    try:
        resp = urllib2.urlopen(request)
        return loads(resp.read())
    except urllib2.HTTPError as e:
        print("Status Code: '{}'.".format(e.code), file=sys.stderr, end='')
        if e.code == 401:
            print(' Failed to Authenticate.', file=sys.stderr, end='')
        print(file=sys.stderr)
        sys.exit(1)
    except Exception:
        print_exc()
        print("Unknown exception.", file = sys.stderr)

def indicator_extract(data):
    return data.get('data', [ ])

def actor_extract(data):
    return [ d for _, l in data.iteritems() for d in l.get('data', [ ]) ]

def indicator_to_dictionary(indicator, convert_timestamp = False):
    tags   = { tag['category'] : tag['value'] for tag in indicator.get(TAG_RESPONSE_KEY, []) }
    result = clone(indicator)
    result.update(tags)

    if convert_timestamp:
        for key in TIMESTAMP_KEYS:
            if key in result:
                result[key] = dt.datetime.strftime(dt.datetime.utcfromtimestamp(result[key] / 1000), '%Y-%m-%d %H:%M:%S')

    return result

def dictionaries_to_json(dictionaries):
    for line in dumps(dictionaries, indent=4).split('\n'):
        yield line        

def dictionaries_to_csv(dictionaries):
    if not len(dictionaries): return
    key_space = list(reduce(or_, [set(k for k in dictionary.keys() if k not in NON_CSV_KEYS) \
                                  for dictionary in dictionaries]))
    key_space.sort(key=get_order)

    header = ','.join(key_space)
    yield header

    for dictionary in dictionaries:
        try:
            yield ','.join(map(str, [dictionary.get(key, '') for key in key_space]))
        except UnicodeDecodeError:
            yield ','.join(map(str, [dictionary.get(key, '').encode('ascii','replace') for key in key_space]))
        except:
            continue

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description=
             'Return all indicators or actor indicators from Area 1 Security. '
             'A1S_USER_UUID and A1S_USER_PASSWORD must be set in environment variables.')
    parser.add_argument('-s', '--since-date',  help='Since date; yyyy-mm-dd. Default- today.', required=False)
    parser.add_argument('-e', '--end-date',    help='End date; yyyy-mm-dd. Default- today.',   required=False)
    parser.add_argument('-i', '--indicators',  help='Indicators only.',                        action='store_true')
    parser.add_argument('-a', '--actors',      help='Actor Indicators only.',                  action='store_true')
    parser.add_argument('-t', '--convertts',   help='Convert Timestamps to Human Readable.',   action='store_true')
    parser.add_argument('-f', '--format',      help='Output format (json or csv)',             required=False)
    args = parser.parse_args()

    try:
        credentials = base64.b64encode('{}:{}'.format(os.environ['A1S_USER_UUID'],os.environ['A1S_USER_PASSWORD']))
    except Exception:
        print('A1S_USER_UUID and A1S_USER_PASSWORD environment variables must be setup properly.')
        sys.exit(1)

    if (not args.indicators and not args.actors) or \
       (args.indicators and args.actors):
        print('Please select -i or -a.')
        sys.exit(1)

    api_query = INDICATOR_QUERY   if args.indicators else ACTOR_QUERY
    extract   = indicator_extract if args.indicators else actor_extract

    today = dt.date.today()

    if args.since_date:
        s = args.since_date
    else:
        s = str(today)

    if args.end_date:
        e = args.end_date
    else:
        e = str(today)

    ts = time.mktime(dt.datetime.strptime(s, '%Y-%m-%d').timetuple())
    since_date = int(ts)
    ts = time.mktime(dt.datetime.strptime(e, '%Y-%m-%d').timetuple())
    end_date = int(ts)

    api_endpoint = '{}{}'.format(AREA1_ENDPOINT, api_query).format(since = since_date, end = end_date)
    indicators   = extract(fetch(api_endpoint, credentials))
    dictionaries = [ indicator_to_dictionary(indicator, convert_timestamp = args.convertts) \
                     for indicator in indicators ]

    formatter = {
        'csv'  : dictionaries_to_csv,
        'json' : dictionaries_to_json
    }.get(args.format, dictionaries_to_json)

    for result in formatter(dictionaries):
        try:
            print(result)
        except Exception:
            exit(0)