"""
Summary:
keyup (python3) | Scripted rotation of access keys for an IAM User.
- Display of key report showing key metadata
- Creation of new access keys
- Keyset installation in awscli local config
- Deletion of deprecated keyset
Author:
Blake Huber
Copyright Blake Huber, All Rights Reserved.
License:
GNU General Public License v3.0 (GPL-3)
Additional terms may be found in the complete license agreement:
https://bitbucket.org/blakeca00/keyup/src/master/LICENSE.txt
OS Support:
- RedHat Linux, Amazon Linux, Ubuntu & variants
- Windows 7+
Dependencies:
- Requires python3, tested under py3.5 and py3.6
"""
import os
import sys
import pdb
import platform
import datetime
from time import sleep
from configparser import ConfigParser
import argparse
import inspect
import subprocess
import time
import threading
import boto3
from prompt_toolkit.shortcuts import ProgressBar
from botocore.exceptions import ClientError
from libtools import Colors
from libtools import stdout_message
from libtools.js import export_iterobject
from pyaws.session import authenticated, boto3_session
from keyup.statics import PACKAGE, local_config
from keyup.help_menu import menu_body
from keyup.map import map_identity, map_iam_username
from keyup.list_ops import list_keys
from keyup.iam_operations import local_profilenames
from keyup.cauth import prepare_reportdata, setup_table
from keyup.progress_main import ProgressBarMain
from keyup.thread_progress import custom_formatters, style
from keyup import about, container, logger, keyconfig, __version__
# global objects
config = ConfigParser()
c = Colors()
try:
from keyup.oscodes_unix import exit_codes
os_type = 'Linux'
splitchar = '/' # character for splitting paths (linux)
text = c.BRIGHT_CYAN
except Exception:
from keyup.oscodes_win import exit_codes # non-specific os-safe codes
os_type = 'Windows'
splitchar = '\\' # character for splitting paths (windows)
text = c.CYAN
[docs]def get_current_key(profile_name, surrogate=''):
"""
Extracts the STS AccessKeyId currently utilised in user's
profile in the local awscli configuration
Args:
profile_name: a username in local awscli profile
Returns:
key_id (str): Amazon STS AccessKeyId
Raises:
Exception if profile_name not found in config
"""
if surrogate:
profile_name = surrogate
#
awscli = 'aws'
cmd = 'type ' + awscli + ' 2>/dev/null'
if subprocess.getoutput(cmd):
cmd = awscli + ' configure get ' + profile_name + '.aws_access_key_id'
try:
key_id = subprocess.getoutput(cmd)
except Exception as e:
logger.exception(
'%s: Failed to identify AccessKeyId used in %s profile. Error: %s' %
(inspect.stack()[0][3], profile_name, str(e)))
return ''
return key_id
[docs]def remove_temporary_credentials(config_object, prefix='', profiles=False):
"""
Filers temporary credentials from the list of profilenames
present in the local awscli configuration
Args:
:config_object (configParser object): object containing profilenames
from the local awscli credentials file
:prefix (str): any prefix prepended to profilenames to indicate temporary
role credentials in the local awscli credentials file
:profiles (bool): flag indicating return of a profilename list instead of
Returns:
configParser object | profilename list (list)
"""
if prefix and profiles:
clean_profiles = list(filter(lambda x: prefix not in x, config_object.sections()))
return clean_profiles
else:
for profile in config_object.sections():
if 'aws_security_token' in config_object[profile].keys():
config_object.pop(profile)
return config_object
[docs]def clean_config(quiet):
"""
Test local awscli config for Active temporary credentails
Args:
:credentials_file (str): location of local awscli credentials file
:config (configParser obj): GLOBAL object representing parsed awscli
credentials file
Returns:
TYPE: bool, Success | Failure
.. note:: Conditions when ``clean_config`` returns False (Failure):
If parsed_config contains **active credentials**, key rotation
prohibited and keyup will exit.
If parsed_config contains **inactive credentials**, ``clean_config`` returns
``bool True`` and key rotation proceeds
"""
parsed_config, credentials_file = parse_awscli()
logger.info('Parsing local awscli credentails file: %s' % credentials_file)
counter = 0
for profile in parsed_config.sections():
if 'aws_security_token' in parsed_config[profile].keys():
logger.info('Temporary credentials found in profile %s' % profile)
# test authentication
if authenticated(profile):
if quiet is False:
msg = ("""Active temporary credentials found in profile %s.
Key refresh prohibited. Exit (Code: %d)
""" % (profile, exit_codes['EX_CONFIG']['Code']))
stdout_message(msg, 'WARN')
logger.info('Status of temporary credentials is: ACTIVE.')
logger.info('Exit (Code: %d)' % exit_codes['EX_CONFIG']['Code'])
sys.exit(exit_codes['EX_CONFIG']['Code'])
else:
counter += 1 # increment for denied auth
if counter >= 2:
break
logger.info('Config determined clean')
return True
[docs]def parse_awscli():
"""
Parse, update local awscli config credentials
Args:
:user (str): USERNAME, only required when run on windows os
Returns:
TYPE: configparser object, parsed config file
"""
OS = platform.system()
if OS == 'Linux':
HOME = os.environ['HOME']
default_credentials_file = HOME + '/.aws/credentials'
alt_credentials_file = shared_credentials_location()
awscli_file = alt_credentials_file or default_credentials_file
elif OS == 'Windows':
win_username = os.getenv('username')
default_credentials_file = 'C:\\Users\\' + win_username + '\\.aws\\credentials'
alt_credentials_file = shared_credentials_location()
awscli_file = alt_credentials_file or default_credentials_file
else:
logger.warning('Unsupported OS. Exit')
logger.warning(exit_codes['E_ENVIRONMENT']['Reason'])
sys.exit(exit_codes['E_ENVIRONMENT']['Code'])
try:
if os.path.isfile(awscli_file):
# parse config
config.read(awscli_file)
else:
logger.info(
'awscli credentials file [%s] not found. Abort' % awscli_file
)
raise OSError
except Exception as e:
logger.exception(
'%s: problem parsing local awscli config file %s' %
(inspect.stack()[0][3], awscli_file))
return config, awscli_file
[docs]def set_logging(cfg_obj):
"""
Enable or disable logging per config object parameter
"""
log_status = cfg_obj['LOGGING']['ENABLE_LOGGING']
if log_status:
logger.disabled = False
elif not log_status:
logger.info(
'%s: Logging disabled per local configuration file (%s) parameters.' %
(inspect.stack()[0][3], cfg_obj['PROJECT']['CONFIG_PATH'])
)
logger.disabled = True
return log_status
[docs]def precheck():
"""
Verify project runtime dependencies
"""
cfg_path = local_config['PROJECT']['CONFIG_PATH']
# enable or disable logging based on config/ defaults
logging = set_logging(local_config)
if os.path.exists(cfg_path):
logger.info('%s: config_path parameter: %s' % (inspect.stack()[0][3], cfg_path))
logger.info(
'%s: Existing configuration file found. precheck pass.' %
(inspect.stack()[0][3]))
return True
elif not os.path.exists(cfg_path) and logging is False:
logger.info(
'%s: No pre-existing configuration file found at %s. Using defaults. Logging disabled.' %
(inspect.stack()[0][3], cfg_path)
)
return True
if logging:
logger.info(
'%s: Logging enabled per config file (%s).' %
(inspect.stack()[0][3], cfg_path)
)
return True
return False
[docs]def delete_keyset(access_key, profile, surrogate=''):
"""
Deletes oldest access key credentials associated with a user
Args:
- **access_key (str)**: AccessKeyId of the keyset to delete
- **profile (str)**: iam user alias in the local awscli config
- **surrogate (str)**: iam username on which access key operations
are conducted by another iam user denoted in profile
Returns:
TYPE: bool, Success | Failure
"""
try:
if os.environ.get('AWS_ACCESS_KEY_ID'):
session = boto3.Session(
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
profile_name=profile)
client = session.client('iam')
else:
# use in-memory keys set in environment
client = boto3_session(service='iam', profile=profile)
# delete keyset, given AccessKeyId
if surrogate:
response = client.delete_access_key(
AccessKeyId=access_key, UserName=surrogate
)
else:
response = client.delete_access_key(AccessKeyId=access_key)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
else:
logger.warning(
'%s: Response code %d, deprecated access key %s may not have been deleted properly' %
(inspect.stack()[0][3], response['ResponseMetadata']['HTTPStatusCode'], access_key))
return False
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntity':
logger.exception(
"%s: AccessKeyId %s Not Found. Key not deleted" %
(inspect.stack()[0][3], access_key))
return False
else:
logger.exception(
"%s: Problem deleting AccessKeyId %s (Code: %s Message: %s)" %
(inspect.stack()[0][3], access_key, e.response['Error']['Code'],
e.response['Error']['Message']))
return False
except Exception as e:
logger.exception(
"%s: Unknown problem deleting AccessKeyId %s (Error: %s)" %
(inspect.stack()[0][3], access_key, str(e)))
return False
[docs]def create_keyset(iam_user, profile, surrogate=''):
"""
Creates new access key, secret key pair for iam user
Args:
- **access_key (str)**: AccessKeyId of the keyset to delete
- **profile (str)**: iam user alias in the local awscli config
- **surrogate (str)**: iam username on which access key operations
are conducted by another iam user denoted in profile
Returns:
Success | Failure, TYPE: bool, aws access keys (dict)
"""
try:
logger.info(
'Request to create new keyset for %s user %s' %
('surrogate' if surrogate else 'iam',
surrogate if surrogate else iam_user)
)
# create keyset for primary iam user unless for a surrogate
client = boto3_session(service='iam', profile=profile)
if surrogate:
keys = client.create_access_key(UserName=surrogate)
else:
keys = client.create_access_key(UserName=iam_user)
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logger.warning(
'IAM profile user does not have permissions to create new access keys'
)
logger.warning(exit_codes['EX_NOPERM']['Reason'])
sys.exit(exit_codes['EX_NOPERM']['Code'])
else:
logger.exception(
"%s: Problem creating iam access key (Code: %s Message: %s)" %
(inspect.stack()[0][3], e.response['Error']['Code'],
e.response['Error']['Message']))
return False, {}
return True, keys
[docs]def set_keyset(access_key, secret_key, clear=False):
"""
Sets new access keys in memory to execute requests to Amazon APIs
during rewrite of local awscli credentials filename
Args:
:clear (bool): reset keys set as env variables, if present
"""
try:
if clear:
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
else:
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
except Exception:
return False
return True
[docs]class SetLogging():
"""
Initializes project level logging
Args:
- **mode (str)**: log_mode, either 'stream' or 'FILE'
- **disable (bool)**: when True, disables logging output
Returns:
TYPE: bool, Success | Failure
"""
def __init__(self, mode, disable=False):
self.set(mode, disable)
[docs] def set(self, mode, disable):
""" create logger object, enable or disable logging """
global logger
try:
if logger:
if disable:
logger.disabled = True
else:
if mode in ('STREAM', 'FILE'):
logger = logd.getLogger(mode, __version__)
except Exception as e:
logger.exception(
'%s: Problem incurred during logging setup' % inspect.stack()[0][3]
)
return False
return True
[docs]def write_keyset(configparser_obj, filename, debug=False):
"""
Write out new awscli credentials to local config
Args:
- **configparser_obj (configparser)**: parsed awscli config containing
new keyset
- **filename (str)**: path to file to which keyset written
- **debug (bool)**: debug flag
Returns:
TYPE: bool, Success | Failure
"""
alt_writefile = filename + '.orig'
if debug:
HOME = os.environ['HOME']
filename = HOME + '/Downloads/' + DBUG_FILE
logger.debug('output_file is: %s' % filename)
logger.debug('alt_writefile is: %s' % alt_writefile)
logger.debug('Writing credentials output file %s' % (filename))
try:
# write output file
logger.info('Writing credentials file %s' % (filename))
output_file = open(filename, 'w')
configparser_obj.write(output_file)
output_file.close()
logger.info('Successful write of credentials file %s' % (filename))
if os.path.isfile(filename + '.orig'):
logger.info(
'Found alt credentials file (%s). Attempting to update it' %
(alt_writefile))
# write gcreds credentials backup file
output_file = open(filename + '.orig', 'w')
clean_config = remove_temporary_credentials(configparser_obj)
clean_config.write(output_file)
output_file.close()
logger.info('Successful write of alt credentials file %s' % (alt_writefile))
except OSError as e:
logger.exception(
'%s: Problem writing new credentials file %s' %
(inspect.stack()[0][3], filename))
raise
return True
[docs]def write_keyset_backup(keys, user, quiet):
"""
Writes newly created keyset to disk provided configuration file flag set:
Args:
:keys: New keyset object
:quiet: When set, supresses all output to stdout
Returns:
TYPE: bool, Success | Failure
"""
def string_datetime(dt_obj):
""" Replaces datetime with string """
return dt_obj.strftime('%Y-%m-%dT%H:%M:%S')
def safe_keyset(keyset):
""" Replaces dt objects for writing to disk """
date_string = string_datetime(keyset['CreateDate'])
keyset['CreateDate'] = date_string
return keyset
date = datetime.datetime.today().strftime('%Y-%m-%d')
fs_location = local_config['KEY_BACKUP']['BACKUP_LOCATION']
filename = date + '-' + user + '-accessKeys.json'
output_file = fs_location + splitchar + filename
success_msg = 'Copy of new keyset written to: %s' % (c.BOLD + c.WHITE + output_file + c.RESET)
fail_msg = 'Problem writing new keyset to backup location: %s' % output_file
try:
# write keyset
if os.path.exists(fs_location):
if os.path.exists(output_file):
logger.info('%s pre-existing object - overwriting file' % output_file)
r = export_iterobject(safe_keyset(keys), output_file)
else:
logger.warning('Directory location to store new keysets (%s) does not exist' % fs_location)
return False
except OSError as e:
logger.exception(
'%s: Problem writing keyset to backup location: %s' %
(inspect.stack()[0][3], output_file))
return False
# stdout message handling
if r and not quiet:
stdout_message(success_msg, 'INFO')
elif not r and not quiet:
stdout_message(fail_msg, 'WARN')
# log message handling
if r:
logger.info(success_msg)
else:
logger.warning(fail_msg)
return r
[docs]def main(operation, profile, auto, debug, user_name=''):
"""
End-to-end renew of access keys for a specific profile in local awscli config
"""
if user_name:
logger.info('user_name parameter given (%s) as surrogate' % user_name)
user_name = map_iam_username(user_name, profile)
# find out to which iam user profile name maps
user, aws_account = map_identity(profile=profile)
if (user or aws_account) is None:
msg = 'Expired or invalid credentials to authenticate for profile user (%s)'.format(profile)
stdout_message(
('%s: %s. Exit. [Code: %d]' % (inspect.stack()[0][3], msg,
exit_codes['EX_NOPERM']['Code'])), prefix='AUTH', severity='WARNING'
)
logger.warning(msg, exit_codes['EX_NOPERM']['Reason'])
sys.exit(exit_codes['EX_NOPERM']['Code'])
if operation in ROTATE_OPERATIONS:
# check local awscli config for active temporary sts credentials
if clean_config(quiet=auto):
keylist, key_metadata = list_keys(
account=aws_account,
profile=profile,
iam_user=user,
surrogate=user_name,
stage='BEFORE ROTATION',
quiet=auto
)
elif operation == 'list':
# list operation; display current access key(s) and exit
keylist, key_metadata = list_keys(
account=aws_account, profile=profile, iam_user=user,
surrogate=user_name, quiet=auto
)
return True
elif not operation:
msg_accent = (c.BOLD + 'list' + c.RESET + ' | ' + c.BOLD + 'up' + c.RESET)
msg = """You must provide a valid OPERATION for --operation parameter:
--operation { """ + msg_accent + """ }
"""
stdout_message(msg)
logger.warning('%s: No valid operation provided. Exit' % (inspect.stack()[0][3]))
sys.exit(exit_codes['E_MISC']['Code'])
else:
msg = 'Unknown operation. Exit'
stdout_message(msg)
logger.warning('%s: %s' % (msg, inspect.stack()[0][3]))
sys.exit(exit_codes['E_MISC']['Code'])
try:
# -- Key Rotation: 1 keyset exists -------------------------------------
if len(keylist) == 1: # 1 keyset exists
deprecated_access_key = key_metadata[0]['AccessKeyId']
# delete keyset, must supply profile ------------------------------
r = delete_keyset(
access_key=deprecated_access_key,
profile=profile,
surrogate=user_name
)
if r:
result, new_keys = create_keyset(iam_user=user, profile=profile, surrogate=user_name)
else:
logger.warning(
'%s: Deprecated access key %s may not have \
been deleted properly. Check User Permissions' %
(inspect.stack()[0][3], deprecated_access_key)
)
sys.exit(exit_codes['EX_DELETE_FAIL']['Code'])
if result:
# assumble key
parsed, output_file, access_key, secret_key = configure_keyset(new_keys, profile, surrogate=user_name)
# log success
logger.info(
'Create request successful. AccessKeyId (%s) created for %s user %s' %
(access_key, 'surrogate' if user_name != '' else 'iam',
user_name if user_name != '' else user)
)
else:
logger.exception(
'%s: New keys did not generate correctly. Keys received: %s\n\nAbort.' %
(inspect.stack()[0][3], str(new_keys)))
# switch to new keys in mem before rewriting the credentials file
if write_keyset(parsed, output_file, debug):
if set_keyset(access_key, secret_key):
# delete keyset, no profile given, use in memory keys
d = 0.05 # duration / cycle (secs)
print('\n')
title = 'Rotating access keys... Please wait '
iterations = int(KEY_ENABLE_DELAY / d)
exit_event = threading.Event()
with ProgressBar(style=style, formatters=custom_formatters) as pb:
# Two parallel tasks.
def display_progress():
for i in pb(range(iterations), label=title):
time.sleep(d)
if exit_event.is_set():
break
sys.stdout.flush()
def delay():
sleep(KEY_ENABLE_DELAY)
# Start threads.
t1 = threading.Thread(target=display_progress)
t2 = threading.Thread(target=delay)
t1.daemon = True
t2.daemon = True
print('\n')
t1.start()
t2.start()
# Wait for the primary work thread to finish (t2)
while t2.is_alive():
t2.join(timeout=0.5) # wait only for report data thread
else:
exit_event.set()
keylist, key_metadata = list_keys(
account=aws_account,
profile=profile,
iam_user=user,
surrogate=user_name,
stage='AFTER ROTATION',
quiet=auto
)
# write copy of new keyset to backup location if config file flag set
if local_config['KEY_BACKUP']['BACKUP_ENABLE']:
write_keyset_backup(
keys=new_keys['AccessKey'],
user=user_name or profile,
quiet=auto
)
return True
else:
logger.warning(
'%s: Problem setting new keyset; failed authentication test (AccessKeyId: %s). Exit.' %
(inspect.stack()[0][3], access_key)
)
return False
else:
logger.warning(
'%s: Could not write new keyset to config (AccessKeyId: %s). Exit.' %
(inspect.stack()[0][3], access_key)
)
return False
# -- Key Rotation: 2 keysets exist -------------------------------------
elif len(keylist) == 2:
if local_config['KEY_METADATA']['KEY_DEPRECATION'] == 'AGE':
# find oldest ACTIVE key, will replace
dates = [x['CreateDate'] for x in key_metadata]
dates.sort()
deprecated_keydate = dates[0] # oldest date
for key in key_metadata:
if key['CreateDate'] == deprecated_keydate and key['Status'] == 'Active':
deprecated_access_key = key['AccessKeyId']
else:
deprecated_access_key = get_current_key(profile_name=profile, surrogate=user_name)
if deprecated_access_key:
logger.info('Deprecated access key identified as (%s)' % deprecated_access_key)
else:
logger.warning(
'%s: Failed to identify access key for replacement. Exit (Code: %s)' %
(inspect.stack()[0][3], sys.exit(exit_codes['EX_AWSCLI']['Code']))
)
sys.exit(exit_codes['EX_AWSCLI']['Code'])
if debug:
logger.debug(
'%s: key_metadata is: %s' %
(inspect.stack()[0][3], str(key_metadata))
)
logger.debug(
'%s: sorted key create dates: %s' %
(inspect.stack()[0][3], str(dates))
)
sys.exit(exit_codes['EX_OK']['Code'])
# delete keyset, must supply profile ------------------------------
r = delete_keyset(
access_key=deprecated_access_key,
profile=profile,
surrogate=user_name
)
if r:
result, new_keys = create_keyset(iam_user=user, profile=profile, surrogate=user_name)
else:
logger.warning(
'%s: Deprecated access key %s may not have \
been deleted properly. Check User Permissions' %
(inspect.stack()[0][3], deprecated_access_key)
)
sys.exit(exit_codes['EX_DELETE_FAIL']['Code'])
# --- parse and write new keys -----------------------------------
if result:
# assemble key
parsed, output_file, access_key, secret_key = configure_keyset(new_keys, profile, surrogate=user_name)
# log success
logger.info(
'Create request successful. AccessKeyId (%s) created for iam user %s' %
(access_key, user)
)
else:
logger.exception(
'%s: New keys did not generate correctly, keys have length of %d. Abort.' %
(inspect.stack()[0][3], len(new_keys)))
sys.exit(exit_codes['EX_CREATE_FAIL']['Code'])
# write new awscli config
if write_keyset(parsed, output_file, debug):
d = 0.05 # duration / cycle (secs)
title = 'Rotating access keys... '
iterations = int(KEY_ENABLE_DELAY / d)
exit_event = threading.Event()
with ProgressBar(style=style, formatters=custom_formatters) as pb:
# Two parallel tasks.
def display_progress():
for i in pb(range(iterations), label=title):
time.sleep(d)
if exit_event.is_set():
break
sys.stdout.flush()
def delay():
sleep(KEY_ENABLE_DELAY)
# Start threads.
t1 = threading.Thread(target=display_progress)
t2 = threading.Thread(target=delay)
t1.daemon = True
t2.daemon = True
print('\n')
t1.start()
t2.start()
# Wait for the primary work thread to finish (t2)
while t2.is_alive():
t2.join(timeout=0.5) # wait only for report data thread
else:
exit_event.set()
#print('\n')
keylist, key_metadata = list_keys(
account=aws_account,
profile=profile,
iam_user=user,
surrogate=user_name,
stage='AFTER ROTATION',
quiet=auto
)
# write copy of new keyset to backup location if config file flag set
if local_config['KEY_BACKUP']['BACKUP_ENABLE']:
write_keyset_backup(
keys=new_keys['AccessKey'],
user=user_name or profile,
quiet=auto
)
return True
else:
logger.warning(
'%s: Could not write new keyset to config (AccessKeyId: %s). Exit.' %
(inspect.stack()[0][3], access_key)
)
return False
except KeyError as e:
logger.critical(
'%s: Cannot find Key %s' %
(inspect.stack()[0][3], str(e)))
return False
except OSError as e:
logger.critical(
'%s: problem writing to file %s. Error %s' %
(inspect.stack()[0][3], output_file, str(e)))
return False
except Exception as e:
logger.critical(
'%s: Unknown error. Error %s' %
(inspect.stack()[0][3], str(e)))
raise e
[docs]def options(parser, help_menu=False):
"""
Parse cli parameter options
Returns:
TYPE: argparse object, parser argument set
"""
parser.add_argument("-p", "--profile", nargs='?', default="default",
required=False, help="type (default: %(default)s)")
parser.add_argument("-o", "--operation", nargs='?', default='list', type=str,
choices=VALID_OPERATIONS, required=False)
parser.add_argument("-c", "--configure", dest='configure', action='store_true', required=False)
parser.add_argument("-d", "--debug", dest='debug', action='store_true', required=False)
parser.add_argument("-h", "--help", dest='help', action='store_true', required=False)
parser.add_argument("-q", "--quiet", dest='quiet', action='store_true', required=False)
parser.add_argument("-R", "--key-report", dest='keyreport', action='store_true', required=False)
parser.add_argument("-u", "--user-name", dest='username', type=str, required=False)
parser.add_argument("-V", "--version", dest='version', action='store_true', required=False)
return parser.parse_args()
[docs]def package_version():
"""
Prints package version and requisite PACKAGE info
"""
print(about.about_object)
sys.exit(exit_codes['EX_OK']['Code'])
[docs]def shared_credentials_location():
"""
Discover alterate location for awscli shared credentials file
Returns:
TYPE: str, Full path of shared credentials file, if exists
"""
if 'AWS_SHARED_CREDENTIALS_FILE' in os.environ:
return os.environ['AWS_SHARED_CREDENTIALS_FILE']
return ''
[docs]def source_globals():
"""
Source all global variable definitions here
"""
global IAM_KEYS
IAM_KEYS = ['aws_access_key_id', 'aws_secret_access_key']
global VALID_OPERATIONS
VALID_OPERATIONS = ('list', 'up', 'keyup', 'rotate')
global ROTATE_OPERATIONS
ROTATE_OPERATIONS = ('up', 'keyup', 'rotate')
global DBUG_FILE
DBUG_FILE = 'test-credentials'
global KEY_ENABLE_DELAY
KEY_ENABLE_DELAY = local_config['KEY_METADATA']['KEY_ENABLE_DELAY']
global KEYAGE_WARNING
KEYAGE_WARNING = local_config['KEY_METADATA']['KEYAGE_WARNING']
[docs]def init():
"""Caller function; initializes all functionality"""
try:
source_globals()
except KeyError:
# remove offending configuration file, then recreate
if os.path.exists(local_config['PROJECT']['CONFIG_PATH']):
os.remove(local_config['PROJECT']['CONFIG_PATH'])
return keyconfig.option_configure(False, local_config['PROJECT']['CONFIG_PATH'])
# argument paring
parser = argparse.ArgumentParser(add_help=False)
try:
args = options(parser)
except Exception as e:
help_menu()
stdout_message(str(e), 'ERROR')
sys.exit(exit_codes['EX_OK']['Code'])
if len(sys.argv) == 1:
help_menu()
sys.exit(exit_codes['EX_OK']['Code'])
elif args.help:
help_menu()
sys.exit(exit_codes['EX_OK']['Code'])
elif args.version:
package_version()
elif args.configure:
r = keyconfig.option_configure(args.debug, local_config['PROJECT']['CONFIG_PATH'])
return r
elif args.keyreport:
# clear screen; create margin header space
msg = 'Generating Key Report. Please Wait...'
tab = '\t'.expandtabs(46)
os.system('cls' if os.name == 'nt' else 'clear')
print(('\n' * 5) + f'\n{tab}{msg}\n\n\n\n')
d = 0.05 # duration / cycle (secs)
title = 'Querying Amazon STS'
iterations = round(len(local_profilenames()) / d)
exit_event = threading.Event()
with ProgressBar(style=style, formatters=custom_formatters) as pb:
# Two parallel tasks.
def display_progress():
for i in pb(range(iterations), label=title):
time.sleep(d)
if exit_event.is_set():
break
sys.stdout.flush()
# Start threads.
t1 = threading.Thread(target=display_progress)
t2 = threading.Thread(target=prepare_reportdata, args=(container, ))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()
# Wait for the primary work thread to finish (t2)
while t2.is_alive():
t2.join(timeout=0.5) # wait only for report data thread
else:
exit_event.set()
# Exit ProgessBar context & unpack Queue
data, skipped = container.get()
setup_table(data, skipped)
else:
if precheck(): # if prereqs set, run
if authenticated(profile=args.profile):
# execute keyset operation
success = main(
operation=args.operation,
profile=args.profile,
user_name=args.username,
auto=args.quiet,
debug=args.debug
)
if success:
logger.info('IAM access keyset operation complete')
sys.exit(exit_codes['EX_OK']['Code'])
else:
stdout_message(
'Authenication Failed to AWS Account for user %s' % args.profile,
prefix='AUTH',
severity='WARNING'
)
sys.exit(exit_codes['E_AUTHFAIL']['Code'])
failure = """ : Check of runtime parameters failed for unknown reason.
Please ensure local awscli is configured. Then run keyconfig to
configure keyup runtime parameters. Exiting. Code: """
logger.warning(failure + 'Exit. Code: %s' % sys.exit(exit_codes['E_MISC']['Code']))
print(failure)
# end