CodeCommitsIssuesPull requestsActionsInsightsSecurity
master

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

hangouts_chat.py

593lines · modecode

1import json
2import httplib2
3import logging
4from typing import Iterable, Optional
5from errbot.backends.base import Message, Card
6from errbot.backends.base import Person
7from errbot.backends.base import Room, RoomError
8from errbot.errBot import ErrBot
9from google.cloud import pubsub
10from oauth2client.service_account import ServiceAccountCredentials
11from cachetools import LRUCache
12
13
14from markdownconverter import hangoutschat_markdown_converter
15from prometheus import PrometheusMetrics
16
17log = logging.getLogger('errbot.backends.hangoutschat')
18
19
20def removeprefix(source: str, prefix: str):
21 if source.startswith(prefix):
22 return source[len(prefix):]
23 return source
24
25
26class RoomsNotSupportedError(RoomError):
27 def __init__(self, message=None):
28 if message is None:
29 message = (
30 "Most Room operations are not supported in Google Hangouts Chat."
31 "While Rooms are a _concept_, the API is minimal and does not "
32 "expose this functionality to bots"
33 )
34 super().__init__(message)
35
36
37class MalformedCardError(Exception):
38 def __init__(self, card: Card, message: str):
39 super().__init__((
40 "Malformed or unexpected card data provided:\n"
41 f" {message}"
42 ))
43
44
45class GoogleHangoutsChatAPI:
46 """
47 Represents the Google Hangouts REST API
48 See: https://developers.google.com/hangouts/chat/reference/rest/
49 """
50 base_url = 'https://chat.googleapis.com/v1'
51 # Number of results to fetch at a time. Default is 100, Max is 1000
52 page_size = 500
53
54 # Maximum length of any single message sent to google chat
55 max_message_length = 4096
56
57 def __init__(self, creds_file: str, scope: str = 'https://www.googleapis.com/auth/chat.bot'):
58 self.creds_file = creds_file
59 self.scope = scope
60
61 @property
62 def credentials(self):
63 return ServiceAccountCredentials.from_json_keyfile_name(self.creds_file,
64 scopes=[self.scope])
65
66 @property
67 def client(self):
68 return self.credentials.authorize(httplib2.Http())
69
70 def _request(self, uri: str, query_string: str = None, **kwargs) -> Optional[dict]:
71 request_args = {
72 'method': 'GET',
73 'headers': {'Content-Type': 'application/json; charset=UTF-8', }}
74 request_args.update(kwargs)
75 url = '{}/{}'.format(self.base_url, uri)
76 if query_string:
77 url += '?{}'.format(query_string)
78 result, content = self.client.request(
79 uri=url,
80 **request_args
81 )
82 if result['status'] == '200':
83 content_json = json.loads(content.decode('utf-8'))
84 return content_json
85 else:
86 log.error('status: {}, content: {}'.format(result['status'], content))
87 return None
88
89 def _download(self, uri: str) -> Optional[bytes]:
90 request_args = {
91 'method': 'GET',
92 'headers': {'Content-Type': 'application/octet-stream', }}
93 result, content = self.client.request(
94 uri=uri,
95 **request_args
96 )
97 if result['status'] == '200':
98 return content
99 else:
100 log.error('status: {}, content: {}'.format(result['status'], content))
101 return None
102
103 def _list(self, resource: str, return_attr: str, next_page_token: str = '') -> Iterable[dict]:
104 """
105 Gets a list of resources.
106
107 Args:
108 resource: name of resource to list
109 return_attr: name of attribute in the root of the response to get
110 resources from
111 next_page_token: the nextPageToken returned by the previous call
112
113 Yields:
114 dict: the next found resource
115 """
116
117 query_string = 'pageSize={}'.format(self.page_size)
118 if next_page_token:
119 query_string += '&pageToken={}'.format(next_page_token)
120 data = self._request(resource, query_string=query_string)
121 if data:
122 for itm in data[return_attr]:
123 yield itm
124 next_page_token = data.get('nextPageToken')
125 if next_page_token != '':
126 yield from self._list(resource, return_attr, next_page_token)
127
128 def get_spaces(self) -> Iterable[dict]:
129 return self._list('spaces', 'spaces')
130
131 def get_space(self, name: str) -> Optional[dict]:
132 return self._request('spaces/{}'.format(removeprefix(name, 'spaces/')))
133
134 def get_members(self, space_name: str) -> Iterable[dict]:
135 return self._list('spaces/{}/members'.format(removeprefix(space_name, 'spaces/')), 'memberships')
136
137 def get_member(self, space_name: str, name: str) -> Optional[dict]:
138 return self._request('spaces/{}/members/{}'.format(removeprefix(space_name, 'spaces/'), name))
139
140 def create_message(self, space_name: str, body: dict, thread_state: str = None, thread_key: str = None) -> Optional[dict]:
141 url = 'spaces/{}/messages'.format(removeprefix(space_name, 'spaces/'))
142
143 # If using a thread key, set messageReplyOption and thread key querystring
144 if thread_key is not None:
145 return self._request(url, body=json.dumps(body), method='POST',
146 query_string='threadKey={}&messageReplyOption={}'.format(thread_key, 'REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD'))
147
148 # if space is using new threading and there is a thread_id set, set messageReplyOption to keep messages in that thread
149 if thread_state == 'THREADED_MESSAGES' and body.get('thread', {}).get('name', '') != '':
150 return self._request(url, body=json.dumps(body), method='POST',
151 query_string='messageReplyOption={}'.format('REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD'))
152
153 # otherwise, post the message
154 return self._request(url, body=json.dumps(body), method='POST')
155
156class HangoutsChatRoom(Room):
157 """
158 Represents a 'Space' in Google-Hangouts-Chat terminology
159 """
160
161 def __init__(self, space_id, chat_api):
162 super().__init__()
163 self.space_id = space_id
164 self.chat_api = chat_api
165 self._load()
166
167 def _load(self):
168 space = self.chat_api.get_space(self.space_id)
169 self.does_exist = bool(space)
170 self.display_name = space.get('displayName','') if self.does_exist else ''
171
172 def join(self, username=None, password=None):
173 raise RoomsNotSupportedError()
174
175 def create(self):
176 raise RoomsNotSupportedError()
177
178 def leave(self, reason=None):
179 raise RoomsNotSupportedError()
180
181 def destroy(self):
182 raise RoomsNotSupportedError()
183
184 @property
185 def joined(self):
186 raise RoomsNotSupportedError()
187
188 @property
189 def exists(self):
190 raise RoomsNotSupportedError()
191
192 @property
193 def topic(self):
194 raise RoomsNotSupportedError()
195
196 @property
197 def occupants(self):
198 memberships = self.chat_api.get_members(self.space_id)
199 occupants = []
200 for membership in memberships:
201 name = '{} ({} / {})'.format(membership['member']['displayName'],
202 membership['member']['name'],
203 membership['state'])
204 if membership['member']['type'] == 'BOT':
205 name += ' **BOT**'
206 occupants.append(HangoutsChatUser(name,
207 membership['member']['displayName'],
208 None,
209 membership['member']['type']))
210
211 return occupants
212
213 def invite(self, *args):
214 raise RoomsNotSupportedError()
215
216
217class HangoutsChatUser(Person):
218 def __init__(self, name, display_name, email, user_type):
219 super().__init__()
220 self.name = name
221 self.display_name = display_name
222 self.email = email
223 self.user_type = user_type
224
225 @property
226 def person(self):
227 return self.name
228
229 @property
230 def fullname(self):
231 return self.display_name
232
233 @property
234 def client(self):
235 return 'Hangouts Chat'
236
237 @property
238 def nick(self):
239 return self.display_name
240
241 @property
242 def aclattr(self):
243 return self.email
244
245
246class GoogleHangoutsChatBackend(ErrBot):
247 def __init__(self, config):
248 super().__init__(config)
249 identity = config.BOT_IDENTITY
250 self.at_name = config.BOT_PREFIX
251 self.creds_file = identity['GOOGLE_CREDS_FILE']
252 self.gce_project = identity['GOOGLE_CLOUD_ENGINE_PROJECT']
253 self.gce_topic = identity['GOOGLE_CLOUD_ENGINE_PUBSUB_TOPIC']
254 self.gce_subscription = identity['GOOGLE_CLOUD_ENGINE_PUBSUB_SUBSCRIPTION']
255 self.chat_api = GoogleHangoutsChatAPI(self.creds_file)
256 self.bot_identifier = HangoutsChatUser(None, self.at_name, None, None)
257 self.event_cache = LRUCache(1024)
258 self.md = hangoutschat_markdown_converter()
259
260 # Initialize prometheus metrics if metrics port is configured
261 self.prometheus = None
262 if config.METRICS_PORT is not None:
263 self.prometheus = PrometheusMetrics(self.at_name, int(config.METRICS_PORT))
264
265 def _subscribe_to_pubsub_topic(self, project, topic_name, subscription_name, callback):
266 subscriber = pubsub.SubscriberClient()
267 subscription_name = 'projects/{}/subscriptions/{}'.format(project, subscription_name)
268 log.info("Subscribed to {}".format(subscription_name))
269 return subscriber.subscribe(subscription_name, callback=callback)
270
271 def _event_cache_format_key(self, event_data):
272 event_time = event_data.get('eventTime', 0)
273 if event_time == 0:
274 log.warning('Received 0 eventTime from message')
275
276 event_type = event_data.get('type', 'NO_EVENT_TYPE_PROVIDED_BY_GHC')
277
278 space_name = event_data.get('space', {}).get('name', '')
279
280 # eventTime does not change for CARD_CLICKED events when events are generated by multiple clicks on the same card
281 # seems like it should, but it does not. So message.lastUpdateTime must also be used in key to avoid missing event
282 message_last_update_time = event_data.get('message', {}).get('lastUpdateTime', 'NO_MESSAGE_LASTUPDATETIME_PROVIDED_BY_GHC')
283
284 return "{}{}{}{}".format(event_time, event_type, space_name, message_last_update_time)
285
286 def _handle_event(self, event):
287 try:
288 data = json.loads(event.data.decode('utf-8'))
289 except Exception:
290 log.warning('Received malformed event: {}'.format(event.data))
291 event.ack()
292 return
293
294 event.ack()
295 # event.ack() may fail silently, so we should ensure our messages are somewhat idempotent
296 event_key = self._event_cache_format_key(data)
297 log.info("event received: %s", event_key)
298 cached = self.event_cache.get(event_key)
299 if cached is not None:
300 return
301 self.event_cache[event_key] = True
302
303 # https://developers.google.com/chat/reference/message-formats/events
304 # https://developers.google.com/chat/how-tos/cards-onclick#receiving_user_click_information
305 event_type = data.get('type')
306 if event_type == 'MESSAGE':
307 return self.handle_event_MESSAGE(data)
308 elif event_type == 'CARD_CLICKED':
309 return self.handle_event_CARD_CLICKED(data)
310 elif data.get('message', {}).get('text') is not None:
311 # CARD_CLICKED events do not contain data.message.text field
312 log.warn(f"Event type '{event_type}' received, handling as 'MESSAGE' type to support previous backend implementation."
313 "If your code relies on handling this event type as a 'MESSAGE' type, please update your code as this will eventually be deprecated."
314 "You will also need to add an event handler for the specific event type to this backend codebase.")
315 return self.handle_event_MESSAGE(data)
316 else:
317 log.info(f"Unsupported event type '{event_type}' received")
318 return
319
320 # https://developers.google.com/chat/how-tos/cards-onclick
321 def handle_event_CARD_CLICKED(self, data):
322 action_method_name = data.get('action', {}).get('actionMethodName')
323 log.info(f"'CARD_CLICKED' event with actionMethodName '{action_method_name}' received")
324 action_params = {p['key']: p['value'] for p in data.get('action', {}).get('parameters', [])}
325
326 # this can be extended to handle any arbitrary action_method_names
327 if action_method_name == 'bot_command':
328 command = action_params.get('command')
329 command_args = action_params.get('command_args')
330 if command is None:
331 log.error("Required 'command' parameter missing for 'bot_command' actionMethodName")
332 return
333 MESSAGE_data = dict(data)
334 MESSAGE_data['message']['text'] = f"{command} {command_args}"
335 self.handle_event_MESSAGE(MESSAGE_data)
336 else:
337 log.info(f"Unsupported CARD_CLICKED event action method name '{action_method_name}' received")
338
339 def handle_event_MESSAGE(self, data):
340 # https://developers.google.com/chat/api/guides/message-formats/events#message
341 sender_blob = data.get('message', {}).get('sender', {})
342 sender = HangoutsChatUser(sender_blob.get('name', ''),
343 sender_blob.get('displayName', ''),
344 sender_blob.get('email', ''),
345 sender_blob.get('type', ''))
346 message_body = data['message'].get('text','')
347 context = {
348 'space_id': data['space']['name'],
349 'message_id': data['message']['name'],
350 'thread_id': data['message']['thread']['name'],
351 'thread_state': data['space']['spaceThreadingState']
352 }
353
354 if 'attachment' in data['message']:
355 context['attachment'] = data['message']['attachment']
356 # pass httplib2.Http() authenticated handler to errbot. uselful to download attachments
357 context['downloader'] = self.chat_api._download
358
359 msg = Message(body=message_body.strip(), frm=sender, extras=context)
360
361 is_dm = data['message']['space']['type'] == 'DM'
362 if is_dm:
363 msg.to = self.bot_identifier
364
365 self.callback_message(msg)
366
367 def _split_message(self, text, maximum_message_length=GoogleHangoutsChatAPI.max_message_length):
368 '''
369 Splits a given string up into multiple strings all of length less than some maximum size
370
371 Edge Case: We don't handle the case where one line is big enough for a whole message
372 '''
373 lines = text.split('\n')
374 messages = []
375 current_message = ''
376 for line in lines:
377 if len(current_message) + len(line) + 1 > maximum_message_length:
378 messages.append(current_message)
379 current_message = line + '\n'
380 else:
381 current_message += line + '\n'
382
383 messages.append(current_message)
384 return messages
385
386 def prep_message_context(self, message):
387 space_id = message.extras.get('space_id', None)
388 thread_id = message.extras.get('thread_id', None)
389 thread_key = message.extras.get('thread_key', None)
390 thread_state = message.extras.get('thread_state', None)
391 return space_id, thread_id, thread_key, thread_state
392
393 def send_message(self, message):
394 super(GoogleHangoutsChatBackend, self).send_message(message)
395 log.info("Sending {}".format(message.body))
396 convert_markdown = message.extras.get('markdown', True)
397 space_id, thread_id, thread_key, thread_state = self.prep_message_context(message)
398 if not space_id:
399 log.info(message.body)
400 return
401 mentions = message.extras.get('mentions', None)
402 text = message.body
403 if convert_markdown:
404 text = self.md.convert(message.body)
405 sub_messages = self._split_message(text)
406 log.info("Split message into {} parts".format(len(sub_messages)))
407 for message in sub_messages:
408 message_payload = {
409 'text': message
410 }
411 if mentions:
412 message_payload['annotations'] = []
413 for mention in mentions:
414 message_payload['annotations'].append(
415 {
416 "type": "USER_MENTION",
417 "startIndex": mention['start'],
418 "length": mention['length'],
419 "userMention": {
420 "user": {
421 "name": mention['user_id'],
422 "displayName": mention['display_name'],
423 "type": "HUMAN"
424 },
425 "type": "ADD"
426 }
427 }
428 )
429 if thread_id:
430 message_payload['thread'] = {'name': thread_id}
431
432 gc = self.chat_api.create_message(space_id, message_payload, thread_state, thread_key)
433
434 # record sent message success or failure
435 if self.prometheus is not None and self.prometheus.metric('message_sent'):
436 self.prometheus.metric('message_sent').labels(status=('failure' if gc == None else 'success')).inc()
437
438 # errbot expects no return https://errbot.readthedocs.io/en/latest/errbot.core.html#errbot.core.ErrBot.send_message
439 # but we need this in order to get the thread_id from a thread_key generated message
440
441 return None if gc == None else {
442 'space_id': gc.get('space', {}).get('name', ''),
443 'message_id': gc.get('name', ''),
444 'thread_id': gc.get('thread', {}).get('name', ''),
445 'thread_key': thread_key
446 }
447
448 # Legacy send_card signature. This is being deprecated in favor of errbot upstream signature that matches other built-in plugins.
449 def send_card_deprecated(self, cards, space_id, thread_id=None):
450 log.info("Sending card")
451 message_payload = {
452 'cards': cards
453 }
454 if thread_id:
455 message_payload['thread'] = {'name': thread_id}
456
457 self.chat_api.create_message(space_id, message_payload)
458
459 # Creates a message body following the card format described in google dev docs
460 # https://developers.google.com/chat/reference/message-formats/cards
461 def send_card(self, errbot_card: Card, space_id=None, thread_id=None):
462 if not isinstance(errbot_card, Card):
463 log.warning("deprecated signature of 'send_card' method called, recommend changing to current version that matches upstream signature.")
464 return self.send_card_deprecated(errbot_card, space_id, thread_id)
465
466 log.info(f"Sending card {errbot_card.title}...")
467
468 if not errbot_card.title:
469 raise MalformedCardError(errbot_card, "'title' field required")
470 ghc_card = {
471 "header": {
472 "title": errbot_card.title,
473 },
474 "sections": []
475 }
476
477 if errbot_card.summary:
478 ghc_card['header']['subtitle'] = errbot_card.summary
479 if errbot_card.thumbnail:
480 ghc_card['header']['imageUrl'] = errbot_card.thumbnail
481 if errbot_card.link:
482 raise MalformedCardError(errbot_card, "'link' field not supported, please use body field.")
483 if errbot_card.fields:
484 raise MalformedCardError(errbot_card, "'fields' field not supported, please use body field.")
485 if errbot_card.image:
486 raise MalformedCardError(errbot_card, "'image' field not supported, please use body field.")
487 if errbot_card.color:
488 log.debug("card 'color' field not supported.")
489
490 if not errbot_card.body:
491 raise MalformedCardError(errbot_card, "'body' field required")
492
493 ghc_card['sections'] = json.loads(errbot_card.body)
494 # Example of 'sections' body string:
495 # https://developers.google.com/chat/reference/message-formats/cards
496 #
497 # [
498 # {
499 # "widgets": [
500 # {
501 # "keyValue": {
502 # "topLabel": "Order No.",
503 # "content": "12345"
504 # }
505 # },
506 # ]
507 # },
508 # {
509 # "header": "Location",
510 # "widgets": [
511 # {
512 # "image": {
513 # "imageUrl": "https://maps.googleapis.com/..."
514 # }
515 # }
516 # ]
517 # },
518 # {
519 # "widgets": [
520 # {
521 # "buttons": [
522 # {
523 # "textButton": {
524 # "text": "OPEN ORDER",
525 # "onClick": {
526 # "openLink": {
527 # "url": "https://example.com/orders/..."
528 # }
529 # }
530 # }
531 # }
532 # ]
533 # }
534 # ]
535 # }
536 # ]
537
538 message_payload = {
539 'cards': [ghc_card]
540 }
541
542 space_id, thread_id, thread_key, thread_state = self.prep_message_context(errbot_card.parent)
543 if not space_id:
544 log.info(f"No space_id for card titled '{errbot_card.title}', not sending.")
545 return
546 if thread_id:
547 message_payload['thread'] = {'name': thread_id}
548
549 self.chat_api.create_message(space_id, message_payload, thread_state, thread_key)
550
551 def serve_forever(self):
552 subscription = self._subscribe_to_pubsub_topic(self.gce_project,
553 self.gce_topic,
554 self.gce_subscription,
555 self._handle_event)
556 self.connect_callback()
557
558 if self.prometheus is not None:
559 self.prometheus.start_server()
560
561 try:
562 import time
563 while True:
564 time.sleep(10)
565 except KeyboardInterrupt:
566 log.info("Exiting")
567 finally:
568 self.disconnect_callback()
569 self.shutdown()
570
571 def build_identifier(self, strrep):
572 return HangoutsChatUser(None, strrep, None, None)
573
574 def build_reply(self, msg, text=None, private=False, threaded=False):
575 response = Message(body=text, frm=msg.to, to=msg.frm, extras=msg.extras)
576 return response
577
578 def change_presence(self, status='online', message=''):
579 return None
580
581 @property
582 def mode(self):
583 return 'Google_Hangouts_Chat'
584
585 def query_room(self, room):
586 return HangoutsChatRoom(room, self.chat_api)
587
588 def rooms(self):
589 spaces = self.chat_api.get_spaces()
590 rooms = ['{} ({})'.format(space['displayName'], space['name'])
591 for space in list(spaces) if space['type'] == 'ROOM']
592
593 return rooms