CodeCommitsIssuesPull requestsActionsInsightsSecurity
master

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS
SSH

Download ZIP

a1s-api

151lines · modecode

1#!/usr/bin/env python
2
3#
4# (c) 2016-2017, Area 1 Security
5#
6# Sample code demonstrating the Area 1 Security remote API.
7# Version 2.0
8#
9#
10
11from __future__ import print_function
12from traceback import print_exc
13from operator import or_
14from urlparse import urljoin
15from copy import deepcopy as clone
16from json import dumps, loads
17
18import os, sys, time, argparse
19import datetime as dt
20import csv
21import urllib2
22import base64
23
24AREA1_ENDPOINT = 'https://papillon.area1security.com/'
25INDICATOR_QUERY = 'indicators?since={since}&end={end}'
26ACTOR_QUERY = 'actors'
27TAG_RESPONSE_KEY = 'tags'
28NON_CSV_KEYS = ('categories', 'tags', 'threat_categories', 'tag_histories', 'aliases')
29DESIRED_ORDER = {'indicator': 0, 'name': 1, 'type': 2, 'item_name': 3, 'item_type': 4,
30 'threat_name': 5, 'description': 6, 'overall_confidence': 7, 'first_seen': 8, 'last_seen': 9}
31TIMESTAMP_KEYS = ('first_detected', 'first_seen', 'last_seen')
32
33
34def get_order(key):
35 return DESIRED_ORDER.get(key, len(DESIRED_ORDER))
36
37
38def fetch(url, creds):
39 request = urllib2.Request(url)
40 request.add_header('Authorization', 'Basic {}'.format(creds))
41
42 try:
43 resp = urllib2.urlopen(request)
44 return loads(resp.read())
45 except urllib2.HTTPError as e:
46 print("Status Code: '{}'.".format(e.code), file=sys.stderr, end='')
47 if e.code == 401:
48 print(' Failed to Authenticate.', file=sys.stderr, end='')
49 print(file=sys.stderr)
50 sys.exit(1)
51 except Exception:
52 print_exc()
53 print("Unknown exception.", file = sys.stderr)
54
55def indicator_extract(data):
56 return data.get('data', [ ])
57
58def actor_extract(data):
59 return [ d for _, l in data.iteritems() for d in l.get('data', [ ]) ]
60
61def indicator_to_dictionary(indicator, convert_timestamp = False):
62 tags = { tag['category'] : tag['value'] for tag in indicator.get(TAG_RESPONSE_KEY, []) }
63 result = clone(indicator)
64 result.update(tags)
65
66 if convert_timestamp:
67 for key in TIMESTAMP_KEYS:
68 if key in result:
69 result[key] = dt.datetime.strftime(dt.datetime.utcfromtimestamp(result[key] / 1000), '%Y-%m-%d %H:%M:%S')
70
71 return result
72
73def dictionaries_to_json(dictionaries):
74 for line in dumps(dictionaries, indent=4).split('\n'):
75 yield line
76
77def dictionaries_to_csv(dictionaries):
78 if not len(dictionaries): return
79 key_space = list(reduce(or_, [set(k for k in dictionary.keys() if k not in NON_CSV_KEYS) \
80 for dictionary in dictionaries]))
81 key_space.sort(key=get_order)
82
83 header = ','.join(key_space)
84 yield header
85
86 for dictionary in dictionaries:
87 try:
88 yield ','.join(map(str, [dictionary.get(key, '') for key in key_space]))
89 except UnicodeDecodeError:
90 yield ','.join(map(str, [dictionary.get(key, '').encode('ascii','replace') for key in key_space]))
91 except:
92 continue
93
94if __name__ == "__main__":
95 parser = argparse.ArgumentParser(description=
96 'Return all indicators or actor indicators from Area 1 Security. '
97 'A1S_USER_UUID and A1S_USER_PASSWORD must be set in environment variables.')
98 parser.add_argument('-s', '--since-date', help='Since date; yyyy-mm-dd. Default- today.', required=False)
99 parser.add_argument('-e', '--end-date', help='End date; yyyy-mm-dd. Default- today.', required=False)
100 parser.add_argument('-i', '--indicators', help='Indicators only.', action='store_true')
101 parser.add_argument('-a', '--actors', help='Actor Indicators only.', action='store_true')
102 parser.add_argument('-t', '--convertts', help='Convert Timestamps to Human Readable.', action='store_true')
103 parser.add_argument('-f', '--format', help='Output format (json or csv)', required=False)
104 args = parser.parse_args()
105
106 try:
107 credentials = base64.b64encode('{}:{}'.format(os.environ['A1S_USER_UUID'],os.environ['A1S_USER_PASSWORD']))
108 except Exception:
109 print('A1S_USER_UUID and A1S_USER_PASSWORD environment variables must be setup properly.')
110 sys.exit(1)
111
112 if (not args.indicators and not args.actors) or \
113 (args.indicators and args.actors):
114 print('Please select -i or -a.')
115 sys.exit(1)
116
117 api_query = INDICATOR_QUERY if args.indicators else ACTOR_QUERY
118 extract = indicator_extract if args.indicators else actor_extract
119
120 today = dt.date.today()
121
122 if args.since_date:
123 s = args.since_date
124 else:
125 s = str(today)
126
127 if args.end_date:
128 e = args.end_date
129 else:
130 e = str(today)
131
132 ts = time.mktime(dt.datetime.strptime(s, '%Y-%m-%d').timetuple())
133 since_date = int(ts)
134 ts = time.mktime(dt.datetime.strptime(e, '%Y-%m-%d').timetuple())
135 end_date = int(ts)
136
137 api_endpoint = '{}{}'.format(AREA1_ENDPOINT, api_query).format(since = since_date, end = end_date)
138 indicators = extract(fetch(api_endpoint, credentials))
139 dictionaries = [ indicator_to_dictionary(indicator, convert_timestamp = args.convertts) \
140 for indicator in indicators ]
141
142 formatter = {
143 'csv' : dictionaries_to_csv,
144 'json' : dictionaries_to_json
145 }.get(args.format, dictionaries_to_json)
146
147 for result in formatter(dictionaries):
148 try:
149 print(result)
150 except Exception:
151 exit(0)