CodeCommitsIssuesPull requestsActionsInsightsSecurity
a162023fbb96dc92c10b88ad7cd0f05191e7cc6e

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

a1s-api

134lines · modepreview

#!/usr/bin/env python

#
# (c) 2016, Area 1 Security
#
# Sample code demonstrating the Area 1 Security remote API.
# Version 1.4
#
#

from __future__ import print_function
from traceback  import print_exc
from operator   import or_
from urlparse   import urljoin
from copy       import deepcopy as clone
from json       import dumps

import os, sys, time, argparse
import datetime as dt
import ast
import csv
import urllib2
import base64

AREA1_ENDPOINT   = 'https://papillon.area1security.com/'
AREA1_ENDPOINT   = 'http://p3-qa-papillon:8080/papillon/'
BLOCKABLE_QUERY  = 'blockable-indicators?since={since}&end={end}'
MALICIOUS_QUERY  = 'malicious-indicators?since={since}'
TAG_RESPONSE_KEY = 'tags'
TAG_HISTORY_KEY  = 'tag_histories'
TIMESTAMP_KEYS   = ('first_detected', )

def fetch(url, creds):
    request = urllib2.Request(url)
    request.add_header('Authorization', 'Basic {}'.format(creds))

    try:
        resp = urllib2.urlopen(request)
        return ast.literal_eval(resp.read())
    except urllib2.HTTPError as e:
        print("Status Code: '{}'.".format(e.code), file=sys.stderr, end='')
        if e.code == 401:
            print(' Failed to Authenticate.', file=sys.stderr, end='')
        print(file=sys.stderr)
        sys.exit(1)
    except Exception:
        print_exc()
        print("Unknown exception.", file = sys.stderr)

def indicator_to_dictionary(indicator, convert_timestamp = False):
    tags   = { tag['category'] : tag['value'] for tag in indicator.get(TAG_RESPONSE_KEY, []) }
    result = clone(indicator)
    result.update(tags)

    for key in (TAG_HISTORY_KEY, TAG_RESPONSE_KEY):
        if key in result:
            del result[key]

    if convert_timestamp:
        for key in TIMESTAMP_KEYS:
            if key in result:
                result[key] = dt.datetime.strftime(dt.datetime.utcfromtimestamp(result[key]), '%Y-%m-%d %H:%M:%S')

    return result

def dictionaries_to_json(dictionaries):
    for dictionary in dictionaries:
        yield dumps(dictionary)

def dictionaries_to_csv(dictionaries):
    key_space = list(reduce(or_, [set(dictionary.keys()) for dictionary in dictionaries]))
    header    = ','.join(key_space)
    yield header

    for dictionary in dictionaries:
        yield ','.join(map(str, [dictionary.get(key, '') for key in key_space]))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description=
             'Return Blockable and Malicious Indicators from Area 1 Security. '
             'UUID and Password must be set in environment variables.')
    parser.add_argument('-s', '--since-date',  help='Since date; yyyy-mm-dd. Default- today.', required=False)
    parser.add_argument('-e', '--end-date',    help='End date; yyyy-mm-dd. Default- today.',   required=False)
    parser.add_argument('-b', '--blockable',   help='Blockable Indicators only.',              action='store_true')
    parser.add_argument('-m', '--malicious',   help='Malicious Indicators only.',              action='store_true')
    parser.add_argument('-t', '--convertts',   help='Convert Timestamps to Human Readable.',   action='store_true')
    parser.add_argument('-f', '--format',      help='Output format (json or csv)',             required=False)
    args = parser.parse_args()

    try:
        credentials = base64.b64encode('{}:{}'.format(os.environ['A1S_USER_UUID'],os.environ['A1S_USER_PASSWORD']))
    except Exception:
        print('A1S_USER_UUID and A1S_USER_PASSWORD environment variables must be setup properly.')
        sys.exit(1)

    if (not args.blockable and not args.malicious) or \
       (args.blockable and args.malicious):
        print('Please select -b or -m.')
        sys.exit(1)

    api_query = BLOCKABLE_QUERY if args.blockable else MALICIOUS_QUERY

    today = dt.date.today()

    if args.since_date:
        s = args.since_date
    else:
        s = str(today)

    if args.end_date:
        e = args.end_date
    else:
        e = str(today)

    ts = time.mktime(dt.datetime.strptime(s, '%Y-%m-%d').timetuple())
    since_date = int(ts)
    ts = time.mktime(dt.datetime.strptime(e, '%Y-%m-%d').timetuple())
    end_date = int(ts)

    api_endpoint = '{}{}'.format(AREA1_ENDPOINT, api_query)
    indicators   = fetch(api_endpoint, credentials)
    dictionaries = [ indicator_to_dictionary(indicator, convert_timestamp = args.convertts) \
                     for indicator in indicators ]

    formatter = {
        'csv'  : dictionaries_to_csv,
        'json' : dictionaries_to_json
    }.get(args.format, dictionaries_to_json)

    for result in formatter(dictionaries):
        try:
            print(result)
        except Exception:
            exit(0)