Source code for keyup.cauth

"""
Key Report Generation Module:
    - Centralized authentication module for producing Key Report
    - Uses threading for concurrent processing

Module Functions:
    - convert:
        Converts time units to hours
    - discover_account_affiliations:
        maps awscli profile users to corresponding iam user ids
    - expired_keys:
        determines if an access keyset is aged beyond max age value in
        keyup's configuration file
    - display_table:
        renders vpt table to cli stdout
"""

import os
import sys
import datetime
import inspect
import pytz
import unicodedata
from botocore.exceptions import ClientError

# 3rd party
from veryprettytable import VeryPrettyTable
from pyaws.session import boto3_session
from libtools.js import export_iterobject
from libtools import stdout_message
from libtools import Colors
from keyup.iam_operations import local_profilenames
from keyup.map import map_identity
from keyup.vault import KEYAGE_MAX
from keyup.colormap import ColorMap
from keyup.statics import local_config
from keyup import keyconfig, logger, container


try:
    from keyup.oscodes_unix import exit_codes
    os_type = 'Linux'
    splitchar = '/'                             # character for splitting paths (linux)
    text = Colors.BRIGHT_CYAN
except Exception:
    from keyup.oscodes_win import exit_codes    # non-specific os-safe codes
    os_type = 'Windows'
    splitchar = '\\'                            # character for splitting paths (windows)
    text = Colors.CYAN


cm = ColorMap()


# universal colors
yl = Colors.YELLOW + Colors.BOLD
fs = Colors.GOLD3
bd = Colors.BOLD
gn = Colors.BRIGHT_GREEN
btext = text + Colors.BOLD
bdwt = cm.bdwt
dgray = cm.dg1
frame = text
ub = Colors.UNBOLD
cmark = unicodedata.lookup('heavy check mark')
xmark = unicodedata.lookup('LIGHT SALTIRE')
rst = Colors.RESET


tablespec = {
    'border': True,
    'header': True,
    'padding': 2,
    'field_max_width': 70
}

column_widths = {
    'ProfileName': 16,
    'iam_user': 14,
    'account': 16,
    'CreateDate': 22,
    'time_remaining': 14
}


[docs]class authentication(): """ class def for generation and retention of a single set of credentials for generating key-report via a single iam user priviledges """ def __init__(self, profile): self.profile_name = profile self.iam_user = None self.access_key = None self.secret_key = None if (self.access_key or self.secret_key) is None: self.generate_token(self.iam_user)
[docs] def generate_token(self, user): pass
[docs]def convert(dt): """Convert days to hours""" expiration_date = dt + KEYAGE_MAX now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) if now < expiration_date and (expiration_date - now).days == 0: return (expiration_date - now).seconds / 3600
[docs]def discover_account_affiliations(): """ Associates each profile name in local awscli configuration to an iam username and an AWS Account Number Returns: affiliation info, TYPE: dict .. code: json { profile_name: { "account": 104713565656, "iam_user": admin-sandbox2 } } """ affiliations = {} for profile in local_profilenames(): try: iam_user, aws_account = map_identity(profile) affiliations[profile] = {'iam_user': iam_user, 'account': aws_account} except ClientError: fname = inspect.stack()[0][3] logger.info( '{}: Unable to locate aws account for profile {}'.format(fname, profile)) continue return affiliations
[docs]def display_skipped(iam_users): """ Display iam users exceptions skipped in the key report """ tab1 = '\t'.expandtabs(34) tab2 = '\t'.expandtabs(46) stdout_message( message=f'{cm.gray}Users failing authentication omitted from report:', indent=30 ) for user in iam_users: print('{}- {}'.format(tab2, user))
[docs]def display_table(table, exceptions, tabspaces=4): """ Print Table Object offset from left by tabspaces """ indent = ('\t').expandtabs(tabspaces) table_str = table.get_string() for e in table_str.split('\n'): print(indent + frame + e) sys.stdout.write(Colors.RESET + '\n') display_skipped(exceptions) if exceptions else print('') sys.stdout.write(Colors.RESET + '\n\n') return True
[docs]def expired_keys(dt): """ Convert datetime objects into human readable """ now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) delta_td = now - dt if delta_td < KEYAGE_MAX: return False return True
[docs]def format_remaining(days: int): """ Formats days remaining value Returns: days (int) with appropriate color, spacing format applied """ def spacing(): s = '' for digit in str(days): s = s + ' ' return s if (0 <= days < KEYAGE_WARNING): return cm.yl + spacing() + str(round(days)) + ' days' + rst elif days < 0: return cm.brd + spacing() + str(round(days / -1)) + 'd overdue' + rst else: return str(days) + ' days'
[docs]def spacing(days): s = '' for digit in str(days): s = s + ' ' return s
[docs]def time_remaining(dt): """Calculate the days until expiration""" expiration_date = dt + KEYAGE_MAX now = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) if now < expiration_date: return (expiration_date - now).days return -1 * (now - expiration_date).days
def _postprocessing(): return True
[docs]def setup_table(user_data, exception_list): """ Renders Table containing data elements via cli stdout """ # setup table x = VeryPrettyTable( border=tablespec['border'], header=tablespec['header'], padding_width=tablespec['padding'] ) x.field_names = [ bdwt + 'ProfileName' + frame, bdwt + 'IAM User' + frame, bdwt + 'AWS AccountId' + frame, bdwt + 'CreateDate' + frame, bdwt + 'Time Remaining' + frame, bdwt + 'Status' + frame, ] # cell max width x.max_width[bdwt + 'ProfileName' + frame] = column_widths['ProfileName'] x.max_width[bdwt + 'IAM User' + frame] = column_widths['iam_user'] x.max_width[bdwt + 'AWS AccountId' + frame] = column_widths['account'] x.max_width[bdwt + 'CreateDate' + frame] = column_widths['CreateDate'] x.max_width[bdwt + 'Time Remaining' + frame] = column_widths['time_remaining'] # cell min = max width x.min_width[bdwt + 'ProfileName' + frame] = x.max_width[bdwt + 'ProfileName' + frame] x.min_width[bdwt + 'IAM User' + frame] = x.max_width[bdwt + 'IAM User' + frame] x.min_width[bdwt + 'AWS AccountId' + frame] = x.max_width[bdwt + 'AWS AccountId' + frame] x.min_width[bdwt + 'CreateDate' + frame] = x.max_width[bdwt + 'CreateDate' + frame] x.min_width[bdwt + 'Time Remaining' + frame] = column_widths['time_remaining'] # cell alignment x.align[bdwt + 'ProfileName' + frame] = 'l' x.align[bdwt + 'IAM User' + frame] = 'l' x.align[bdwt + 'AWS AccountId' + frame] = 'l' x.align[bdwt + 'CreateDate' + frame] = 'c' x.align[bdwt + 'Time Remaining' + frame] = 'c' x.align[bdwt + 'Status' + frame] = 'c' # populate table for k, v in user_data.items(): dt = v['CreateDate'] expired = expired_keys(v['CreateDate']) _days = time_remaining(dt) k = truncate_fields(k) v = truncate_fields(v) if not expired and (0 <= _days < KEYAGE_WARNING): # close to expiration, warning profilename = yl + k + rst user = yl + v['iam_user'] + rst accountId = yl + v['account'] + rst createdate = yl + dt.strftime('%b %d, %Y %H:%M UTC') + rst remaining = yl + str(_days) + ' days' + rst if _days > 1 else yl + str(_days) + ' day' + rst status = yl + cmark + rst # < 1 day remains; calculate hours remaining if _days == 0: remaining = yl + str(int(round(convert(dt)))) + ' hrs' + spacing(int(round(convert(dt)))) + rst else: # key credentials are either expired (age > KEYAGE_MAX) or valid profilename = cm.brd + k + rst if expired else k user = cm.brd + v['iam_user'] + rst if expired else v['iam_user'] accountId = cm.brd + v['account'] + rst if expired else v['account'] createdate = cm.brd + dt.strftime('%b %d, %Y %H:%M UTC') + rst if expired else dt.strftime('%b %d, %Y %H:%M UTC') remaining = format_remaining(_days) status = (cm.brd + xmark + rst if expired else gn + bd + cmark + rst) if _days == 1: remaining = str(_days) + ' day' x.add_row( [ rst + profilename + frame, rst + user + frame, rst + accountId + frame, rst + createdate + frame, rst + remaining + frame, rst + status + frame ] ) # Table vtab_int = 9 vtab = '\t'.expandtabs(vtab_int) msg = '{}AWS Identity Access Key Expiration Report{}{}|{}'.format(btext, rst + frame, vtab, rst) print_header(title=msg, indent=10, spacing=vtab_int) display_table(x, exception_list, tabspaces=4) return _postprocessing()
[docs]def source_globals(): """ global environment variable definitions """ global KEYAGE_WARNING KEYAGE_WARNING = local_config['KEY_METADATA']['KEYAGE_WARNING']
[docs]def truncate_fields(element): """ Truncates table field data to align with max column width Returns: truncated element, TYPE: dict or str """ if isinstance(element, dict): for k, v in element.items(): for name, width in column_widths.items(): if k == name and k != 'CreateDate': element[k] = v[:width] return element return element[:column_widths['ProfileName']]
[docs]def prepare_reportdata(debug=False): """ Prints out key expiration info for all profilenames associated with the primary profilename given to access the account information """ debug = False try: source_globals() except KeyError: # remove offending configuration file, then recreate if os.path.exists(local_config['PROJECT']['CONFIG_PATH']): os.remove(local_config['PROJECT']['CONFIG_PATH']) return keyconfig.option_configure(False, local_config['PROJECT']['CONFIG_PATH']) data, aliases = {}, {} exceptions = [] affiliations = discover_account_affiliations() if debug: export_iterobject(affiliations) for k, v in affiliations.items(): account = v['account'] try: r = None client = boto3_session(service='iam', profile=k) r = client.list_access_keys() key_metadata = r['AccessKeyMetadata'] if debug: stdout_message( message='Key information received for profile {}'.format(bd + k + rst), prefix='OK' ) except ClientError as e: fname = inspect.stack()[0][3] logger.info('{}: Unable to list key info for profile {}. Error {}'.format(fname, k, e)) exceptions.append(k) continue try: if aliases.get(account): alias = aliases[account] else: # human readable name of the account alias = client.list_account_aliases()['AccountAliases'][0] # store identified aliases aliases[account] = alias except ClientError: alias = '' except IndexError: alias = account accountId = alias or account iam_user = key_metadata[0]['UserName'] status = key_metadata[0]['Status'] metadata = key_metadata[0]['CreateDate'] data[k] = { 'account': accountId, 'iam_user': iam_user, 'status': status, 'CreateDate': metadata } logger.info('IAM User {} key info found for AWS account {}'.format(iam_user, accountId)) # Queue Operations container.put((data, exceptions)) return True
#return data, exceptions