AWS – Python – Levantamento de VPCs/CIDRs

Objetivo

Fazer o levantamento de todas as VPCs e CIDRs em diversas contas AWS utilizando apenas um script.

Método

Este script faz o levantamento de todos os CIDRs das VPC de todas as contas AWS.
Para que o script execute corretamente o arquivo credencials só pode ter uma entrada de perfil para cada conta AWS.
Se houver mais de uma access key/secret key no arquivo credecials a saída do script ficará duplicada.
Se não houver uma credencial associada a alguma conta AWS, a conta em questão não será analisada.
Assim, se queremos analisar 20 contas AWS precisaremos de 20 credenciais cadastradas.
O formato de cada credencial no arquivo credencials deve ser semelhante a esta:

[aplicacoes]
aws_access_key_id = AKIADF4CHAVEFAKEOPYNDW2VF4
aws_secret_access_key = i7U!!9887chavefakehKUuyh9ajQVnC6BE4bgqW

Em máquinas Windows, por padrão, o arquivo credentials fica localizado em %USERPROFILE%\.aws\

O script também considera que as 3 primeiras linhas do arquivo credencials são destinadas para o perfil default.

O script está todo comentado com a finalidade de ser o mais didático possível.

Código
				
					import boto3
import os.path
import csv


'''
Este script faz o levantamento de todos os CIDRs das VPC de todas as contas AWS.
Para que o script execute corretamente o arquivo credencials só pode ter uma entrada de perfil para cada conta AWS.
Se houver mais de uma access key/secret key no arquivo credecials a saída do script ficará duplicada.
Se não houver uma credencial associada a alguma conta AWS, a conta em questão não será analisada.
Assim, se queremos analisar 20 contas AWS precisaremos de 20 credenciais cadastradas.
O formato de cada credencial no arquivo credencials deve ser semelhante a esta:

[aplicacoes]
aws_access_key_id = AKIADF4CHAVEFAKEOPYNDW2VF4
aws_secret_access_key = i7U!!9887chavefakehKUuyh9ajQVnC6BE4bgqW

Em máquinas Windows, por padrão, o arquivo credentials fica localizado em %USERPROFILE%\.aws\ 

O script também considera que as 3 primeiras linhas do arquivo credencials são destinadas para o perfil default.
 '''

# Função que recupera contas AWS cadastradas no arquivo credencials.
def lista_aws_accounts():
    path = "~/.aws/credentials"    # path do arquivo credencials.
    full_path = os.path.expanduser(path)    # full_path armazena o path completo do arquivo credencials.

    contasAWS = []

    with open(full_path, 'r') as f:    # abre o arquivo de credenciais.
        next(f)    # ignora a credencial "default" (3 primeiras linhas).
        next(f)
        next(f)
        lines = f.readlines()    # lê o arquivo e joga as linhas do arquivo como itens de lista lines.
        f.close()

    for line in lines:    # faz um loop em lines para verificar se é um nome de profile ou access_key ou secret_access_key.
        if line.startswith('['):    # se a linha começar com colchetes então é o nome do perfil AWS.
            perfil = line[1:-2]
            contasAWS.append(perfil)

    return contasAWS

def cria_user_session(perfil):
    session = boto3.Session(profile_name=perfil)  # abre uma sessão
    return session

def cria_sts_client_object(session):
    client_sts = session.client('sts')  # abre um cliente do tipo sts.
    return client_sts

def cria_ec2_client_object(session):
    client_ec2 = session.client('ec2', region_name='sa-east-1')  # abre um cliente do tipo ec2.
    return client_ec2

def captura_regioes_usuario(session):
    regions_array = []  # array que irá conter todas as regiões

    client_region = cria_ec2_client_object(session)  # abre um cliente do tipo ec2 que irá ser utilizado para \
    # verificar as regiões habilitadas para o usuário atrelado as credenciais

    response_regions = client_region.describe_regions()  # response é um dicionário com KEY Regions que por sua\
    # vez tem VALUE com sendo um array de dicionários que contém a KEY RegionName

    regions = response_regions["Regions"]  # regions é do tipo array composto por elementos que são dicionários,\
    # que por sua vez têm a chave RegionName

    for region in regions:  # cria um array com as regiões para o usuário IAM que conectou através das credenciais\
    # do arquivo credentials
        regions_array.append(region["RegionName"])

    return regions_array

def cria_arquivo_saida():
    path_output = "~/downloads/aws_vpcs.csv"  # path do arquivo de saída.
    full_path_output = os.path.expanduser(path_output)  # armazena o path completo do arquivo de saída.

    f = open(full_path_output, 'w', newline='')  # abre o arquivo csv em modo de escrita.
    writer = csv.writer(f, delimiter=';')  # cria uma classe writer.
    writer.writerow(['Account_id'] + ['VPC_Name'] + ['VPC_ID'] + ['VPC_CIDR'])  # escreve cabeçalho no arquivo csv

    return writer, f

def main():
    print('Iniciando levantamento de VPCs em todas as contas AWS...')
    contasAWS = lista_aws_accounts()

    writer, f = cria_arquivo_saida()

    for perfil in contasAWS:
        session = cria_user_session(perfil)

        client_sts = cria_sts_client_object(session)  # abre um cliente do tipo sts para recuperar o account_id.
        account_id = client_sts.get_caller_identity().get('Account')  # atribui o ID da conta AWS.

        regions_array = captura_regioes_usuario(session)

        for x in range(0, len(regions_array)):  # faz um loop por todas as regiões contidas no array regions_array e \
            # mais adiante irá procurar pelas VPCs

            client_ec2 = session.client('ec2', # abre um cliente do tipo ec2
                                   region_name=regions_array[x]
                                   )
            nome_regiao = regions_array[x]  # armazena a região sendo analisada.

            response_vpcs = client_ec2.describe_vpcs()  # recebe resposta sobre as VPC existentes

            vpc_name = ''  # variável que irá receber o nome das VPCs a cada iteração
            vpc_id = ''  # variável que irá receber o ID das VPCs a cada iteração
            vpc_cidr = ''  # variável que irá receber o CIDR das VPCs a cada iteração

            for vpc in response_vpcs['Vpcs']:  # cria uma lista com os IDs das VPCs da primeira página da resposta
                if vpc['IsDefault'] is False:  # só considera VPCs que não são "Default"
                    vpc_id = vpc['VpcId']

                    vpc_name = ''
                    if 'Tags' in vpc:  # testa para verificar se há tags para a VPC.
                        for name in vpc['Tags']:  # itera sobre o array de tags e verifica se há uma tag "Name".
                            if name['Key'] == 'Name':
                                vpc_name = name['Value']

                    vpc_cidr = ''
                    for cidr in vpc['CidrBlockAssociationSet']: # itera sobre o array de CIDR para recupera-lo
                        vpc_cidr = cidr['CidrBlock']

                    if vpc_id != '':
                        print('AccountID:{};Name:{};ID:{};CIDR:{}'.format(account_id, vpc_name, vpc_id, vpc_cidr))
                        writer.writerow([account_id]+[vpc_name]+[vpc_id]+[vpc_cidr]) # escreve no arquivo csv
            #########
            # o trecho abaixo basicamente repete o código anterior mas leva em consiração eventual paginação.
            # este trecho é necessário pois o trecho anterior só verifica a primeira página.
            while 'Marker' in response_vpcs:  # Adiciona as VPCs a uma lista, caso ocorra paginação, procura pela key\
                # "Marker" no dicionário.
                response_ec2 = client_ec2.client_ec2.describe_vpcs(Marker=response_vpcs['Marker'])  # Faz busca na nova\
                # página usando o parâmetro "Marker" na chamada do método list_users.
                for vpc in response_vpcs['Vpcs']:  # cria uma lista com os IDs das VPCs da primeira página da resposta
                    if vpc['IsDefault'] is False:  # só considera VPCs que não são "Default"
                        vpc_id = vpc['VpcId']

                        vpc_name = ''
                        if 'Tags' in vpc:  # testa para verificar se há tags para a VPC.
                            for name in vpc['Tags']:  # itera sobre o array de tags e verifica se há uma tag "Name".
                                if name['Key'] == 'Name':
                                    vpc_name = name['Value']

                        vpc_cidr = ''
                        for cidr in vpc['CidrBlockAssociationSet']:  # itera sobre o array de CIDR para recupera-lo
                            vpc_cidr = cidr['CidrBlock']

                        if vpc_id != '':
                            print('AccountID:{};Name:{};ID:{};CIDR:{}'.format(account_id, vpc_name, vpc_id, vpc_cidr))
                            writer.writerow([account_id] + [vpc_name] + [vpc_id] + [vpc_cidr])  # escreve no arquivo csv

    f.close()  # fecha o arquivo csv.

if __name__ == "__main__":
    main()
				
			

It’ done!

Bruno Veiga

Bruno Veiga

Arquiteto Cloud e Arquiteto de Soluções. Me dedicando em compartilhar conhecimento e ajudar empresas a encontrar as melhores soluções tecnológicas para os problemas do negócio com agilidade, segurança, equipes alinhadas e dentro do orçamento.