-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathInventory_Modules.py
More file actions
5249 lines (4827 loc) · 239 KB
/
Inventory_Modules.py
File metadata and controls
5249 lines (4827 loc) · 239 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
__version__ = "2025.04.03"
"""
** Why are some functions "function" vs. "function2" vs. "function3"?**
As most things in computer science, my naming scheme makes sense to me, but not necessarily everyone :). Hence, here is my attempt at an explanation.
- When a function is named "<function>" without a number after it, it's because that was the original function name I chose and used.
Back in the beginning of my coding journey, I was using *profiles* for authentication/ authorization for everything, thinking that everyone else would use that too.
However, I was wrong. But the naming convention generally sticks, so functions without a numbered prefix, tend to expect the profile to be supplied.
- After a while, I realized that I could pass a dictionary of credentials, instead of the profile itself, which could include things like the region, and therefore,
make the script a little more flexible. So functions that end in "*2", typically take a dict (often called "ocredentials") as a parameter. I've tried to document
what fields are expected in that parameter within the docstring of the function.
- After another long while, I realized I should be creating a class object for the account/ credentials, so I created the "account_class.py" class file, and I've
used that for all functions that end in "*3". However, I have also realized that using the account_class object can sometimes lead to some latency, as some of the
lookups within that class take a while. Soooo... I've recently reverted to using the "*2" functions more than the "*3" functions - since the credentials can be
easily passed back and forth, and it's quicker. There might be some security exposure to doing this that I'm unaware of, so I'm keeping the "*3" functions around
just in case...
"""
def get_regions3(faws_acct, fregion_list=None):
"""
This is a library function to get the AWS region names that correspond to the
fragments that may have been provided via the command line.
For instance
- if the user provides 'us-east', this function will return ['us-east-1','us-east-2'].
- if the user provides 'west', this function will return ['us-west-1', 'us-west-2', 'eu-west-1', etc.]
- if the user provides 'all', this function will return all regions
The first parameter to this library must provide a valid account object that includes a boto3 session,
so that regions can be looked up.
Please note that there is no paging functionality for the "describe_regions" method within EC2, hence no paging below.
"""
import logging
# This handles the case where the user passes a single string, instead of a list or nothing.
if isinstance(fregion_list, str):
fregion_list = [fregion_list]
region_info = faws_acct.session.client('ec2')
if fregion_list is None or "all" in fregion_list or "ALL" in fregion_list or "All" in fregion_list:
regions = region_info.describe_regions(Filters=[{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])
RegionNames = [region_name['RegionName'] for region_name in regions['Regions']]
return RegionNames
# Special case where they want everything - globally
elif 'global' in fregion_list:
regions = region_info.describe_regions(AllRegions=True)
RegionNames = [region_name['RegionName'] for region_name in regions['Regions']]
return RegionNames
else:
regions = region_info.describe_regions(Filters=[{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])
RegionNames = [region_name['RegionName'] for region_name in regions['Regions']]
RegionNames2 = []
for x in fregion_list:
for y in RegionNames:
logging.info(f"Have {y} | Looking for {x}")
if y.find(x) >= 0:
logging.info(f"Found {y}")
RegionNames2.append(y)
return RegionNames2
def get_ec2_regions3(faws_acct, fkey=None):
"""
This is a library function to get the AWS region names that correspond to the
fragments that may have been provided via the command line.
The first parameter to this library must provide a valid session object, which is used to instantiate a boto3 session,
so that regions can be looked up.
Please note that there is no paging functionality for the "describe_regions" method within EC2, hence no paging below.
"""
import logging
from botocore.exceptions import EndpointConnectionError
RegionNames = []
if isinstance(fkey, str):
fkey = [fkey]
try:
region_info = faws_acct.session.client('ec2')
except AttributeError as my_Error:
logging.error(my_Error)
return RegionNames
except EndpointConnectionError as my_Error:
error_message = (f"Connection to AWS seems to not be working.\n"
f"Actual error: {my_Error}")
logging.error(error_message)
return RegionNames
except Exception as my_Error:
error_message = (f"Something stopped working.\n"
f"Actual error: {my_Error}")
logging.error(error_message)
return RegionNames
regions = region_info.describe_regions(Filters=[
{'Name': 'opt-in-status', 'Values': ['opt-in-not-required', 'opted-in']}])
for region in regions['Regions']:
RegionNames.append(region['RegionName'])
if fkey is None or "all" in fkey or "ALL" in fkey or 'All' in fkey:
return RegionNames
RegionNames2 = []
for x in fkey:
for y in RegionNames:
logging.info(f"Have {y} | Looking for {x}")
if y.find(x) >= 0:
logging.info(f"Found {y}")
RegionNames2.append(y)
return RegionNames2
def get_service_regions(service, fkey=None, fprofile=None, ocredentials=None, faws_acct=None):
"""
Parameters:
service = the AWS service we're trying to get regions for. This is useful since not all services are supported in all regions.
fkey = A *list* of string fragments of what region we're looking for.
If not supplied, then we send back all regions for that service.
If they send "us-" (for example), we would send back only those regions which matched that fragment.
This is good for focusing a search on only those regions you're searching within.
Either the profile, ocredentials, or aws_acct account object could be passed. We'll use whatever they pass, or nothing.
"""
import boto3
import logging
if fprofile is not None:
s = boto3.Session(profile_name=fprofile)
elif ocredentials is not None:
s = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
elif faws_acct is not None:
s = faws_acct.session
else:
s = boto3.Session()
regions = s.get_available_regions(service, partition_name='aws', allow_non_regional=False)
if fkey is None or ('all' in fkey or 'All' in fkey or 'ALL' in fkey):
return regions
RegionNames = []
for x in fkey:
for y in regions:
logging.info(f"Have {y} | Looking for {x}")
if y.find(x) >= 0:
logging.info(f"Found {y}")
RegionNames.append(y)
return RegionNames
def validate_region3(faws_acct, fRegion=None):
import logging
session_region = faws_acct.session
client_region = session_region.client('ec2')
if fRegion is None:
logging.info(f"No region supplied. Defaulting to 'us-east-1'")
fRegion = 'us-east-1'
region_info = client_region.describe_regions(Filters=[{'Name': 'region-name', 'Values': [fRegion]}])['Regions']
if len(region_info) == 0:
message = f"'{fRegion}' is not a valid region name for this account"
logging.error(message)
result = {'Success': False, 'Message': message}
return result
else:
message = f"'{fRegion}' is a valid region name for this account"
logging.info(message)
result = {'Success': True, 'Message': message}
return result
def get_profiles(fSkipProfiles=None, fprofiles=None):
"""
We assume that the user of this function wants all profiles.
If they provide a list of profile strings (in fprofiles),
then we compare those strings to the full list of profiles we have,
and return those profiles that contain the strings they sent.
"""
import boto3
import logging
profiles_to_remove = []
my_Session = boto3.Session()
my_profiles = my_Session._session.available_profiles
if fSkipProfiles is None:
fSkipProfiles = []
if fprofiles is None:
fprofiles = ['all']
elif isinstance(fprofiles, str) and fprofiles in my_profiles:
# Update the string to become a list
return [fprofiles]
elif isinstance(fprofiles, str):
error_message = f"Error: The profile passed in '{fprofiles}' doesn't exist."
logging.error(error_message)
return error_message
if len(fSkipProfiles) == 0:
pass
else:
for profile in my_profiles:
logging.info(f"Found profile {profile}")
if ("skipplus" in fSkipProfiles and profile.find("+") >= 0) or profile in fSkipProfiles:
logging.info(f"Removing profile: {profile} since it's in the fSkipProfiles parameter {fSkipProfiles}")
profiles_to_remove.append(profile)
my_profiles = list(set(my_profiles) - set(profiles_to_remove))
if "all" in fprofiles or "ALL" in fprofiles or "All" in fprofiles:
return my_profiles
ProfileList = []
for x in fprofiles:
for y in my_profiles:
logging.info(f"Have {y}| Looking for {x}")
if y.find(x) >= 0:
logging.info(f"Found profile {y}")
ProfileList.append(y)
return ProfileList
def find_in(list_to_search, list_to_find=None, fexact=False):
import logging
if list_to_find is None or None in list_to_find:
return list_to_search
elif 'all' in list_to_find or 'All' in list_to_find or 'ALL' in list_to_find:
return list_to_search
list_to_return = []
for x in list_to_search:
for y in list_to_find:
logging.info(f"Have {x} | Looking for {y}")
if fexact:
if x == y:
list_to_return.append(y)
else:
continue
elif x.find(y) >= 0:
logging.info(f"Found {y}")
list_to_return.append(y)
return list_to_return
def addLoggingLevel(levelName, levelNum, methodName=None):
import logging
"""
Comprehensively adds a new logging level to the `logging` module and the
currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value
`levelNum`. `methodName` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
used.
To avoid accidental clobberings of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Example
-------
>>> addLoggingLevel('TRACE', logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace('that worked')
>>> logging.trace('so did this')
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError(f'{levelName} already defined in logging module')
if hasattr(logging, methodName):
raise AttributeError(f'{methodName} already defined in logging module')
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError(f'{methodName} already defined in logger class')
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
# http://stackoverflow.com/a/13638084/2988730
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)
def find_if_alz(fProfile: str) -> dict:
import boto3
session_org = boto3.Session(profile_name=fProfile)
client_org = session_org.client('s3')
bucket_list = client_org.list_buckets()
response = dict()
response['BucketName'] = None
response['ALZ'] = False
for bucket in bucket_list['Buckets']:
if "aws-landing-zone-configuration" in bucket['Name']:
response['BucketName'] = bucket['Name']
response['ALZ'] = True
response['Region'] = find_bucket_location(fProfile, bucket['Name'])
return response
def find_bucket_location(fProfile, fBucketname):
import boto3
import logging
from botocore.exceptions import ClientError
session_org = boto3.Session(profile_name=fProfile)
client_org = session_org.client('s3')
try:
response = client_org.get_bucket_location(Bucket=fBucketname)
except ClientError as my_Error:
if str(my_Error).find("AccessDenied") > 0:
logging.error(f"Authorization Failure for profile {fProfile}")
return None
if response['LocationConstraint'] is None:
location = 'us-east-1'
else:
location = response['LocationConstraint']
return location
def find_acct_email(fOrgRootProfile, fAccountId):
import boto3
"""
This function *unfortunately* only works with organization accounts.
"""
session_org = boto3.Session(profile_name=fOrgRootProfile)
client_org = session_org.client('organizations')
email_addr = client_org.describe_account(AccountId=fAccountId)['Account']['Email']
# email_addr = response['Account']['Email']
return email_addr
def find_account_number(fProfile=None):
import boto3
import logging
from botocore.exceptions import ClientError, CredentialRetrievalError, InvalidConfigError
response = '123456789012' # This is the Failure response
try:
# logging.info("Looking for profile %s", fProfile)
if fProfile is None:
sts_session = boto3.Session()
else:
sts_session = boto3.Session(profile_name=fProfile)
client_sts = sts_session.client('sts')
response = client_sts.get_caller_identity()['Account']
except ClientError as my_Error:
if str(my_Error).find("UnrecognizedClientException") > 0:
logging.error("%s: Security Issue", fProfile)
pass
elif str(my_Error).find("InvalidClientTokenId") > 0:
logging.error("%s: Security Token is bad - probably a bad entry in config", fProfile)
pass
except CredentialRetrievalError as my_Error:
if str(my_Error).find("CredentialRetrievalError") > 0:
logging.error("%s: Some custom process isn't working", fProfile)
pass
except InvalidConfigError as my_Error:
if str(my_Error).find("InvalidConfigError") > 0:
logging.error(
f"{fProfile}: profile is invalid. Probably due to a config profile based on a credential that doesn't work")
pass
except Exception as my_Error:
logging.error(f"Other kind of failure for profile {fProfile}: {my_Error}")
pass
return response
def find_calling_identity(fProfile):
import boto3
import logging
from botocore.exceptions import ClientError
try:
session_sts = boto3.Session(profile_name=fProfile)
logging.info(f"Getting creds used within profile {fProfile}")
client_sts = session_sts.client('sts')
response = client_sts.get_caller_identity()
creds = {'Arn' : response['Arn'], 'AccountId': response['Account'],
'Short': response['Arn'][response['Arn'].rfind(':') + 1:]}
except ClientError as my_Error:
if str(my_Error).find("UnrecognizedClientException") > 0:
print(f"{fProfile}: Security Issue")
elif str(my_Error).find("InvalidClientTokenId") > 0:
print(f"{fProfile}: Security Token is bad - probably a bad entry in config")
else:
print(f"Other kind of failure for profile {fProfile}")
print(my_Error)
creds = "Failure"
return creds
def RemoveCoreAccounts(MainList, AccountsToRemove=None):
import logging
"""
MainList is expected to come through looking like this:
[{'AccountEmail': 'User+LZ@example.com', 'AccountId': '0123xxxx8912'},
{'AccountEmail': 'User+LZ_Log@example.com', 'AccountId': '1234xxxx9012'},
< ... >
{'AccountEmail': 'User+LZ_SS@example.com', 'AccountId': '9876xxxx1000'},
{'AccountEmail': 'User+Demo@example.com', 'AccountId': '9638xxxx012'}]
AccountsToRemove is simply a list of accounts you don't want to screw with. It might look like this:
['9876xxxx1000', '9638xxxx1012']
"""
if AccountsToRemove is None:
AccountsToRemove = []
NewCA = []
for i in range(len(MainList)):
if MainList[i]['AccountId'] in AccountsToRemove:
logging.info(f"Found {str(MainList[i]['AccountId'])} in AccountsToRemove, removing from list")
continue
else:
logging.info(f"Account {str(MainList[i]['AccountId'])} was allowed")
NewCA.append(MainList[i])
return NewCA
def print_timings(fTiming: bool = False, fverbose: int = 50, fbegin_time=None, fmessage: str = None):
"""
Description: Prints how long it's taken in the script to get to this point...
@param fTiming: Boolean whether we print anything
@param fverbose: Verbosity to determine whether we print when user didn't specify any verbosity. This allows us to only print when they want more info.
@param fbegin_time: The beginning time to compare to
@param fmessage: The message to print out, when we print the timings.
@return: None
"""
from colorama import Fore, init
from time import time
init()
if fTiming and fverbose < 50 and fbegin_time is not None:
print(f"{Fore.GREEN}{fmessage}\n"
f"This script has taken {time() - fbegin_time:.6f} seconds so far{Fore.RESET}")
def make_creds(faws_acct):
return ({'AccessKeyId' : faws_acct.creds.access_key,
'SecretAccessKey': faws_acct.creds.secret_key,
'SessionToken' : faws_acct.creds.token,
'Profile' : faws_acct.credentials['Profile'],
'Region' : faws_acct.Region,
'AccountId' : faws_acct.acct_number,
'AccountNumber' : faws_acct.acct_number,
'MgmtAccount' : faws_acct.MgmtAccount})
def get_child_access(fRootProfile, fChildAccount, fRegion='us-east-1', fRoleList=None):
"""
@param: fRootProfile is a string
@param: fChildAccount expects an AWS account number (ostensibly of a Child Account)
@param: rRegion expects a string representing one of the AWS regions ('us-east-1', 'eu-west-1', etc.)
@param: fRoleList expects a list of roles to try, but defaults to a list of typical roles, in case you don't provide
The first response object is a dict with account_credentials to pass onto other functions
The min response object is the rolename that worked to gain access to the target account
The format of the account credentials dict is here:
account_credentials = { 'Profile' : fRootProfile,
'AccessKeyId' : '',
'SecretAccessKey' : None,
'SessionToken' : None,
'AccountNumber' : None}
"""
import boto3
import logging
from botocore.exceptions import ClientError
if not isinstance(fChildAccount, str): # Make sure the passed in account number is a string
fChildAccount = str(fChildAccount)
ParentAccountId = find_account_number(fRootProfile)
sts_session = boto3.Session(profile_name=fRootProfile)
sts_client = sts_session.client('sts', region_name=fRegion)
if fChildAccount == ParentAccountId:
explain_string = ("We're trying to get access to either the Root Account (which we already have access "
"to via the profile) or we're trying to gain access to a Standalone account. "
"In either of these cases, we should just use the profile passed in, "
"instead of trying to do anything fancy.")
logging.info(explain_string)
# TODO: Wrap this in a try/except loop
account_credentials = sts_client.get_session_token()['Credentials']
account_credentials['AccountNumber'] = fChildAccount
account_credentials['Profile'] = fRootProfile
return account_credentials, 'Check Profile'
if fRoleList is None:
fRoleList = ['AWSCloudFormationStackSetExecutionRole', 'AWSControlTowerExecution',
'OrganizationAccountAccessRole', 'AdministratorAccess', 'Owner']
# Initializing the "Negative Use Case" string, returning the whole list instead of only the last role it tried.
# This way the operator knows that NONE of the roles supplied worked.
return_string = f"{str(fRoleList)} failed. Try Again"
account_credentials = {'Profile' : fRootProfile,
'AccessKeyId' : None,
'SecretAccessKey': None,
'SessionToken' : None,
'AccountNumber' : None}
for role in fRoleList:
try:
logging.info("Trying to access account %s using %s profile assuming role: %s", fChildAccount, fRootProfile,
role)
role_arn = f"arn:aws:iam::{fChildAccount}:role/{role}"
account_credentials = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="Find-ChildAccount-Things")[
'Credentials']
# If we were successful up to this point, then we'll short-cut everything and just return the credentials that worked
account_credentials['Profile'] = fRootProfile
account_credentials['AccountNumber'] = fChildAccount
return account_credentials, role
except ClientError as my_Error:
if my_Error.response['Error']['Code'] == 'ClientError':
logging.info(my_Error)
continue
# Returns a dict object since that's what's expected
# It will only get to the part below if the child isn't accessed properly using the roles already defined
return account_credentials, return_string
def get_child_access3(faws_acct, fChildAccount: str, fRegion: str = None, fRoleList: list = None):
"""
- faws_acct is a custom class (account_class.aws_acct_access)
- fChildAccount expects an AWS account number (ostensibly of a Child Account)
- rRegion expects a string representing one of the AWS regions ('us-east-1', 'eu-west-1', etc.)
- fRoleList expects a list of roles to try, but defaults to a list of typical roles, in case you don't provide
The format of the returned account credentials dict is here:
account_credentials = {'ParentAcctId' : ParentAccountId,
'MgmtAccount' : faws_acct.MgmtAccount,
'OrgType' : org_type,
'AccessKeyId' : faws_acct.creds.access_key,
'SecretAccessKey': faws_acct.creds.secret_key,
'SessionToken' : faws_acct.creds.token,
'AccountNumber' : fChildAccount,
'AccountId' : fChildAccount,
'Region' : fRegion,
'AccountStatus' : faws_acct.AccountStatus,
'RolesTried' : fRoleList,
'Role' : 'Use Profile',
'Profile' : If possible, the profile used to access the account,
'AccessError' : False,
'Success' : True,
'ErrorMessage' : None}
"""
import logging
from botocore.exceptions import ClientError
if not isinstance(fChildAccount, str): # Make sure the passed in account number is a string
fChildAccount = str(fChildAccount)
org_type = faws_acct.AccountType
ParentAccountId = faws_acct.acct_number
if fRegion is None:
fRegion = faws_acct.Region
if fRoleList is None or fRoleList == []:
fRoleList = ['AWSCloudFormationStackSetExecutionRole', 'AWSControlTowerExecution',
'OrganizationAccountAccessRole', 'AdministratorAccess', 'Owner']
elif isinstance(fRoleList, str):
fRoleList = [fRoleList]
sts_client = faws_acct.session.client('sts', region_name=fRegion)
if fChildAccount == ParentAccountId:
explain_string = (f"We're trying to get access to either the Root Account (which we already have access "
f"to via the profile) or we're trying to gain access to a Standalone account. "
f"In either of these cases, we should just use the profile passed in, "
f"instead of trying to do anything fancy.")
logging.info(explain_string)
# TODO: Wrap this in a try/except loop on the off-chance that the class doesn't work properly
account_credentials = {'ParentAcctId' : ParentAccountId,
'MgmtAccount' : faws_acct.MgmtAccount,
'OrgType' : org_type,
'AccessKeyId' : faws_acct.creds.access_key,
'SecretAccessKey': faws_acct.creds.secret_key,
'SessionToken' : faws_acct.creds.token,
'AccountNumber' : fChildAccount,
'AccountId' : fChildAccount,
'Region' : fRegion,
'AccountStatus' : faws_acct.AccountStatus,
'RolesTried' : fRoleList,
'Role' : 'Use Profile',
'Profile' : faws_acct.session.profile_name if faws_acct.session.profile_name else None,
'AccessError' : False,
'Success' : True,
'ErrorMessage' : None}
return account_credentials
# Initializing the "Negative Use Case" string, returning the whole list instead of only the last role it tried.
# This way the operator knows that NONE of the roles supplied worked.
error_message = f"{str(fRoleList)} failed. Try Again"
account_credentials = {'ParentAcctId' : ParentAccountId,
'MgmtAccount' : ParentAccountId,
'OrgType' : 'Child',
'AccessKeyId' : None,
'SecretAccessKey': None,
'SessionToken' : None,
'AccountNumber' : None,
'AccountId' : None,
'Region' : fRegion,
'AccountStatus' : faws_acct.AccountStatus,
'RolesTried' : fRoleList,
'Role' : None,
'Profile' : None,
'AccessError' : False,
'Success' : False,
'ErrorMessage' : error_message}
for role in fRoleList:
try:
if faws_acct.session.profile_name:
logging.info(
f"Trying to access account {fChildAccount} using parent profile: {faws_acct.session.profile_name} assuming role: {role}")
else:
logging.info(
f"Trying to access account {fChildAccount} using account number {faws_acct.acct_number} assuming role: {role}")
role_arn = f"arn:aws:iam::{fChildAccount}:role/{role}"
account_credentials = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="Test-ChildAccount-Access")['Credentials']
# If we were successful up to this point, then we'll short-cut everything and just return the credentials that worked
logging.info(f"The credentials for account {fChildAccount} using parent account "
f"{faws_acct.acct_number} and role name {role} worked")
account_credentials['ParentAcctId'] = ParentAccountId
account_credentials['MgmtAccount'] = ParentAccountId
account_credentials['OrgType'] = 'Child'
account_credentials['AccountNumber'] = fChildAccount
account_credentials['AccountId'] = fChildAccount
account_credentials['Region'] = fRegion
account_credentials['AccountStatus'] = faws_acct.AccountStatus
account_credentials['RolesTried'] = fRoleList
account_credentials['Role'] = role
account_credentials['Profile'] = None
account_credentials['AccessError'] = False
account_credentials['ErrorMessage'] = None
account_credentials['Success'] = True
return account_credentials
except ClientError as my_Error:
error_message = f"In Region {fRegion}, we got error message: {my_Error}"
logging.info(error_message)
account_credentials = {'AccessError': True, 'Success': False, 'ErrorMessage': error_message, 'RolesTried': fRoleList}
continue
except Exception as my_Error:
logging.info(my_Error)
continue
# Returns a dict object since that's what's expected
# It will only get to the part below if the child isn't accessed properly using the roles already defined
logging.debug(f"Failure:\n"
f"Role list: {fRoleList}\n"
f"account credentials: {account_credentials}")
account_credentials = {'AccessError': True, 'Success': False, 'ErrorMessage': "Access Failed", 'RolesTried': fRoleList}
return account_credentials
def enable_drift_on_stacks2(ocredentials: dict, fRegion: str, fStackName: str):
import boto3
import logging
session_cfn = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'], aws_secret_access_key=ocredentials[
'SecretAccessKey'], aws_session_token=ocredentials['SessionToken'], region_name=fRegion)
client_cfn = session_cfn.client('cloudformation')
logging.info(f"Enabling drift detection on Stack {fStackName} in "
f"Account {ocredentials['AccountNumber']} in region {fRegion}")
response = client_cfn.detect_stack_drift(StackName=fStackName)
return response # Since this is an async process, there is no response to send back
def enable_drift_on_stackset2(ocredentials: dict, fStackSetName: str):
"""
@param: ocredentials - dict object containing account information
@param: fStackSetName - string containing the stackset name we're going to look up
"""
import boto3
import logging
session_cfn = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
client_cfn = session_cfn.client('cloudformation')
logging.info(f"Enabling drift detection on Stack {fStackSetName} in "
f"Account {ocredentials['AccountNumber']} in region {ocredentials['Region']}")
response = client_cfn.detect_stack_set_drift(StackSetName=fStackSetName,
OperationPreferences={'RegionConcurrencyType' : 'PARALLEL',
'FailureTolerancePercentage': 10,
'MaxConcurrentPercentage' : 100},
CallAs='SELF')
return response # Since this is an async process, there is no response to send back
def enable_drift_on_stackset3(faws_acct, fStackSetName: str):
"""
@param: ocredentials - dict object containing account information
@param: fStackSetName - string containing the stackset name we're going to look up
"""
import logging
from botocore.exceptions import ClientError
client_cfn = faws_acct.session.client('cloudformation')
logging.info(f"Enabling drift detection on Stack {fStackSetName} in "
f"Account {faws_acct.acct_number} in region {faws_acct.Region}")
response = {'Success': False, 'ErrorMessage': ''}
try:
response = client_cfn.detect_stack_set_drift(StackSetName=fStackSetName,
OperationPreferences={'RegionConcurrencyType' : 'PARALLEL',
'FailureTolerancePercentage': 10,
'MaxConcurrentPercentage' : 100},
CallAs='SELF')
response.update({'Success': True, 'ErrorMessage': ''})
except ClientError as my_Error:
# Only worry about a specific service error code
if my_Error.response['Error']['Code'] == 'OperationInProgressException':
error_message = f"Error: {my_Error}"
logging.error(error_message)
# raise
response.update({'ErrorMessage': error_message})
except Exception as my_Error:
error_message = f"Error: {my_Error}"
logging.error(error_message)
response.update({'ErrorMessage': error_message})
return response # Since this is an async process, there is no response to send back when things go well
"""
Above - Generic functions
Below - Specific functions to specific features
"""
def find_sns_topics2(ocredentials, fTopicFrag: str = None, fExact: bool = False):
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['Region'] holds the region
- ['AccountNumber'] holds the account number
Returns:
List of Topic ARNs found that match the fragment sent
"""
import boto3
import logging
if fTopicFrag is None:
fTopicFrag = ['all']
session_sns = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
client_sns = session_sns.client('sns')
response = {'NextToken': ''}
TopicList = []
while 'NextToken' in response:
response = client_sns.list_topics(NextToken=response['NextToken'])
for item in response['Topics']:
topic_name = item['TopicArn'][item['TopicArn'].rfind(':') + 1:]
TopicList.append(topic_name)
if 'all' in fTopicFrag:
logging.info(f"Looking for all SNS Topics in account {ocredentials['AccountNumber']} from Region {ocredentials['Region']}\n"
f"Topics Returned: {TopicList}\n"
f"We found {len(TopicList)} SNS Topics")
return TopicList
else:
logging.info(f"Looking for specific SNS Topics in account {ocredentials['AccountNumber']} from Region {ocredentials['Region']}")
topic_list2 = []
for item in fTopicFrag:
for topic in TopicList:
logging.info(f"Have {topic} | Looking for {item}")
if fExact:
logging.info("Looking for EXACT matches")
if topic == item:
logging.error(f"Found {topic}")
topic_list2.append(topic)
elif topic.find(item) >= 0:
logging.error(f"Found {topic}")
topic_list2.append(topic)
logging.info(f"We found {len(topic_list2)} SNS Topics", )
return topic_list2
def find_role_names2(ocredentials, fRegion, fRoleNameFrag=None):
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['AccountNumber'] holds the account number
Returns:
List of Role Names found that match the fragment list sent
"""
import boto3
import logging
if fRoleNameFrag is None:
fRoleNameFrag = ['all']
session_iam = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=fRegion)
client_iam = session_iam.client('iam')
# TODO: Enable pagination
response = client_iam.list_roles()['Roles']
RoleNameList = []
for item in response:
RoleNameList.append(item['RoleName'])
if 'all' in fRoleNameFrag:
logging.info(f"Looking for all RoleNames in account {ocredentials['AccountNumber']} from Region {fRegion}\n"
f"RoleName Arns Returned: {RoleNameList}\n"
f"We found {len(RoleNameList)} RoleNames")
return RoleNameList
else:
logging.info(
f"Looking for specific RoleNames in account {ocredentials['AccountNumber']} from Region {fRegion}")
RoleNameList2 = []
for item in fRoleNameFrag:
for RoleName in RoleNameList:
logging.info(f'Have {RoleName} | Looking for {item}')
if RoleName.find(item) >= 0:
logging.info(f'Found {RoleName}')
RoleNameList2.append(RoleName)
logging.info(f"We found {len(RoleNameList2)} Roles")
return RoleNameList2
def find_cw_log_group_names2(ocredentials, fRegion, fCWLogGroupFrag=None):
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['AccountNumber'] holds the account number
Returns:
List of CloudWatch Log Group Names found that match the fragment list
"""
import boto3
import logging
if fCWLogGroupFrag is None:
fCWLogGroupFrag = ['all']
session_cw = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=fRegion)
client_cw = session_cw.client('logs')
CWLogGroupList = []
FirstTime = True
response = {'nextToken': None}
while 'nextToken' in response.keys() or FirstTime:
if FirstTime:
response = client_cw.describe_log_groups()
FirstTime = False
else:
response = client_cw.describe_log_groups(nextToken=response['nextToken'])
for item in response['logGroups']:
CWLogGroupList.append(item['logGroupName'])
if 'all' in fCWLogGroupFrag:
logging.info(f"Looking for all Log Group names in account {ocredentials['AccountNumber']} from Region {fRegion}\n"
f"Log Group Names Returned: {CWLogGroupList}\n"
f"We found {len(CWLogGroupList)} Log Group names")
return CWLogGroupList
else:
logging.info(f"Looking for specific Log Group names in account {ocredentials['AccountNumber']} from Region {fRegion}")
CWLogGroupList2 = []
for item in fCWLogGroupFrag:
for logGroupName in CWLogGroupList:
logging.info(f"Have {logGroupName} | Looking for {item}")
if logGroupName.find(item) >= 0:
logging.info(f"Found {logGroupName}")
CWLogGroupList2.append(logGroupName)
logging.info(f"We found {len(CWLogGroupList2)} Log Groups")
return CWLogGroupList2
def find_org_services2(ocredentials, serviceNameList=None):
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['AccountNumber'] holds the account number
- ['Region'] holds the Region
serviceName allows the user to provide the specific service we're looking for
Returns:
List of services that match the items found in the list provided
"""
import boto3
import logging
if serviceNameList is None:
serviceNameList = ['all']
session_org = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
client_org = session_org.client('organizations')
EnabledOrgServicesList = []
FirstTime = True
response = {'NextToken': None}
while 'NextToken' in response.keys() or FirstTime:
if FirstTime:
response = client_org.list_aws_service_access_for_organization()
FirstTime = False
else:
response = client_org.list_aws_service_access_for_organization(NextToken=response['NextToken'])
EnabledOrgServicesList.extend(response['EnabledServicePrincipals'])
if 'all' in serviceNameList or 'All' in serviceNameList or 'ALL' in serviceNameList:
logging.info(f"Looking for all Org-Enabled services in account {ocredentials['AccountNumber']} from Region {ocredentials['Region']}\n"
f"Enabled Services Returned: {EnabledOrgServicesList}\n"
f"We found {len(EnabledOrgServicesList)} enabled Org Services")
return EnabledOrgServicesList
else:
logging.info(f"Looking for specific enabled Org services in account {ocredentials['AccountNumber']} from Region {ocredentials['Region']}")
EnabledOrgServicesList2 = []
for item in serviceNameList:
for serviceName in EnabledOrgServicesList:
logging.info(f"Have {serviceName} | Looking for {item}")
if serviceName['ServicePrincipal'].find(item) >= 0:
logging.info(f"Found {serviceName}")
EnabledOrgServicesList2.append(serviceName)
logging.info(f"We found {len(EnabledOrgServicesList2)} enabled Org services")
return EnabledOrgServicesList2
def disable_org_service2(ocredentials, serviceName=None):
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['AccountNumber'] holds the account number
- ['Region'] holds the Region
serviceName allows the user to provide the specific service we're looking for
Returns:
List of CloudWatch Log Group Names found that match the fragment list
"""
import boto3
# import logging
returnResponse = {}
if serviceName is None:
returnResponse = {'Success': False, 'ErrorMessage': 'No service name specified'}
return returnResponse
session_org = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
client_org = session_org.client('organizations')
try:
delResponse = client_org.disable_aws_service_access_for_organization(ServicePrincipal=serviceName)
checkResponse = find_org_services2(ocredentials, serviceName)
if len(checkResponse) == 0:
returnResponse = {'Success': True, 'ErrorMessage': None}
else:
returnResponse = {'Success': False, 'ErrorMessage': 'Service didn\'t get deleted properly'}
except (client_org.exceptions.AccessDeniedException, client_org.exceptions.AWSOrganizationsNotInUseException, client_org.exceptions.ConcurrentModificationException,
client_org.exceptions.ConstraintViolationException, client_org.exceptions.InvalidInputException, client_org.exceptions.ServiceException,
client_org.exceptions.TooManyRequestsException, client_org.exceptions.UnsupportedAPIEndpointException) as my_Error:
error_message = f"Error disabling {serviceName} in account {ocredentials['AccountId']}\n" \
f"Full Error: {my_Error}"
returnResponse.update({'Success': False, 'ErrorMessage': error_message})
return returnResponse
def find_security_groups2(ocredentials, f_fragments: list = None, f_exact: bool = False, defaultOnly: bool = False) -> list:
"""
ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['AccountNumber'] holds the account number
- ['Region'] holds the region
@param: f_fragments - a list of fragments to search for
@param: f_exact - a boolean to indicate whether to use exact matching or not
@param: defaultOnly - a boolean to indicate whether to return only default security groups or not
@return: list
"""
import boto3
import logging
SecurityGroups = []
AllSecurityGroups = []
session_vpc = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=ocredentials['Region'])
client_vpc = session_vpc.client('ec2')
logging.info(f"Looking for default Security Groups in account {ocredentials['AccountNumber']} from Region {ocredentials['Region']}")
response = client_vpc.describe_security_groups()
SecurityGroups.extend(response['SecurityGroups'])
while 'NextToken' in response.keys():
response = client_vpc.describe_security_groups(NextToken=response['NextToken'])
SecurityGroups.extend(response['SecurityGroups'])
for security_group in SecurityGroups:
if security_group['GroupName'] == 'default':
security_group['Default'] = True
AllSecurityGroups.append(security_group)
else:
security_group['Default'] = False
AllSecurityGroups.append(security_group) if defaultOnly is False else None
logging.info(f"We found {len(AllSecurityGroups)} {'default' if defaultOnly else ''} Security Groups in account {ocredentials['AccountNumber']} in Region {ocredentials['Region']}")
return AllSecurityGroups