CodeCommitsIssuesPull requestsActionsInsightsSecurity
a162023fbb96dc92c10b88ad7cd0f05191e7cc6e

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS
SSH

Download ZIP

a1s-api

134lines · modecode

1#!/usr/bin/env python
2
3#
4# (c) 2016, Area 1 Security
5#
6# Sample code demonstrating the Area 1 Security remote API.
7# Version 1.4
8#
9#
10
11from __future__ import print_function
12from traceback import print_exc
13from operator import or_
14from urlparse import urljoin
15from copy import deepcopy as clone
16from json import dumps
17
18import os, sys, time, argparse
19import datetime as dt
20import ast
21import csv
22import urllib2
23import base64
24
25AREA1_ENDPOINT = 'https://papillon.area1security.com/'
26AREA1_ENDPOINT = 'http://p3-qa-papillon:8080/papillon/'
27BLOCKABLE_QUERY = 'blockable-indicators?since={since}&end={end}'
28MALICIOUS_QUERY = 'malicious-indicators?since={since}'
29TAG_RESPONSE_KEY = 'tags'
30TAG_HISTORY_KEY = 'tag_histories'
31TIMESTAMP_KEYS = ('first_detected', )
32
33def fetch(url, creds):
34 request = urllib2.Request(url)
35 request.add_header('Authorization', 'Basic {}'.format(creds))
36
37 try:
38 resp = urllib2.urlopen(request)
39 return ast.literal_eval(resp.read())
40 except urllib2.HTTPError as e:
41 print("Status Code: '{}'.".format(e.code), file=sys.stderr, end='')
42 if e.code == 401:
43 print(' Failed to Authenticate.', file=sys.stderr, end='')
44 print(file=sys.stderr)
45 sys.exit(1)
46 except Exception:
47 print_exc()
48 print("Unknown exception.", file = sys.stderr)
49
50def indicator_to_dictionary(indicator, convert_timestamp = False):
51 tags = { tag['category'] : tag['value'] for tag in indicator.get(TAG_RESPONSE_KEY, []) }
52 result = clone(indicator)
53 result.update(tags)
54
55 for key in (TAG_HISTORY_KEY, TAG_RESPONSE_KEY):
56 if key in result:
57 del result[key]
58
59 if convert_timestamp:
60 for key in TIMESTAMP_KEYS:
61 if key in result:
62 result[key] = dt.datetime.strftime(dt.datetime.utcfromtimestamp(result[key]), '%Y-%m-%d %H:%M:%S')
63
64 return result
65
66def dictionaries_to_json(dictionaries):
67 for dictionary in dictionaries:
68 yield dumps(dictionary)
69
70def dictionaries_to_csv(dictionaries):
71 key_space = list(reduce(or_, [set(dictionary.keys()) for dictionary in dictionaries]))
72 header = ','.join(key_space)
73 yield header
74
75 for dictionary in dictionaries:
76 yield ','.join(map(str, [dictionary.get(key, '') for key in key_space]))
77
78if __name__ == "__main__":
79 parser = argparse.ArgumentParser(description=
80 'Return Blockable and Malicious Indicators from Area 1 Security. '
81 'UUID and Password must be set in environment variables.')
82 parser.add_argument('-s', '--since-date', help='Since date; yyyy-mm-dd. Default- today.', required=False)
83 parser.add_argument('-e', '--end-date', help='End date; yyyy-mm-dd. Default- today.', required=False)
84 parser.add_argument('-b', '--blockable', help='Blockable Indicators only.', action='store_true')
85 parser.add_argument('-m', '--malicious', help='Malicious Indicators only.', action='store_true')
86 parser.add_argument('-t', '--convertts', help='Convert Timestamps to Human Readable.', action='store_true')
87 parser.add_argument('-f', '--format', help='Output format (json or csv)', required=False)
88 args = parser.parse_args()
89
90 try:
91 credentials = base64.b64encode('{}:{}'.format(os.environ['A1S_USER_UUID'],os.environ['A1S_USER_PASSWORD']))
92 except Exception:
93 print('A1S_USER_UUID and A1S_USER_PASSWORD environment variables must be setup properly.')
94 sys.exit(1)
95
96 if (not args.blockable and not args.malicious) or \
97 (args.blockable and args.malicious):
98 print('Please select -b or -m.')
99 sys.exit(1)
100
101 api_query = BLOCKABLE_QUERY if args.blockable else MALICIOUS_QUERY
102
103 today = dt.date.today()
104
105 if args.since_date:
106 s = args.since_date
107 else:
108 s = str(today)
109
110 if args.end_date:
111 e = args.end_date
112 else:
113 e = str(today)
114
115 ts = time.mktime(dt.datetime.strptime(s, '%Y-%m-%d').timetuple())
116 since_date = int(ts)
117 ts = time.mktime(dt.datetime.strptime(e, '%Y-%m-%d').timetuple())
118 end_date = int(ts)
119
120 api_endpoint = '{}{}'.format(AREA1_ENDPOINT, api_query)
121 indicators = fetch(api_endpoint, credentials)
122 dictionaries = [ indicator_to_dictionary(indicator, convert_timestamp = args.convertts) \
123 for indicator in indicators ]
124
125 formatter = {
126 'csv' : dictionaries_to_csv,
127 'json' : dictionaries_to_json
128 }.get(args.format, dictionaries_to_json)
129
130 for result in formatter(dictionaries):
131 try:
132 print(result)
133 except Exception:
134 exit(0)