Le blog technique

Toutes les astuces #tech des collaborateurs de PI Services.

#openblogPI

Retrouvez les articles Ă  la une

Python – Rundeck – Script pour Lister les jobs

Présentation du Script query_rundeck_pwd_auth.py

Ce script Python utilise la bibliothèque requests pour interagir avec l’API REST de Rundeck en suivant l’approche suivante :

  1. Authentification par Mot de Passe : Le script gère l’authentification en envoyant les identifiants (j_username et j_password) au point de terminaison /j_security_check de Rundeck. Cela Ă©tablit une session, et le mot de passe est saisi de manière sĂ©curisĂ©e (non affichĂ©) grâce au module getpass.
  2. Collecte des Données :
    • Il commence par lister tous les projets disponibles.
    • Pour chaque projet, il liste les jobs.
    • Pour chaque job, une requĂŞte API est faite pour rĂ©cupĂ©rer la dernière exĂ©cution en utilisant le paramètre max: 1. Le script parse le statut, les dates de dĂ©but et de fin de cette dernière exĂ©cution.
  3. Sortie Standard et Exportation CSV :
    • Les donnĂ©es sont affichĂ©es dans la console pour une vĂ©rification immĂ©diate.
    • Un fichier rundeck_jobs.csv est gĂ©nĂ©rĂ©, fournissant un enregistrement structurĂ© et facilement utilisable pour l’analyse ou l’audit. Le format de sortie est en semicolon-separated values (CSV) : EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION.

Utilisation

L’exĂ©cution du script se fait en fournissant l’URL du site Rundeck, l’URL de l’API et le nom d’utilisateur comme arguments :

python query_rundeck_pwd_auth.py site_url api_url user
python query_rundeck_pwd_auth.py ‘http://myrundeck.mydomain:4440’ ‘http://myrundeck.mydomain:4440/api/14’ myaccount

import requests
import json
from contextlib import redirect_stdout
import sys
import getpass
import datetime



if len(sys.argv) < 4:
    print('Usage: query_rundeck_pwd_auth.py <site_url> <api url> <user>')
    sys.exit()


objdate = datetime.datetime
# site url
rundeck_site_url = (sys.argv[1])
# api url
rundeck_api_url = (sys.argv[2])
# user
user = (sys.argv[3])
# password demandé de façon sécurisée (non affiché)
password = getpass.getpass(prompt='Mot de passe: ')


# Open session with user/password authentication
url = f'{rundeck_site_url}/j_security_check'
session = requests.session()
response = session.post(url, data={"j_username": user, "j_password": password})

# debug minimal : afficher erreur si auth KO
if response.status_code != 200:
    print(f"Auth error: {response.status_code}")
    try:
        print(response.text)
    except Exception:
        pass


# Fonction pour lister les projets
def lister_projects():
    url = f'{rundeck_api_url}/projects'
    headers = {
        'Accept': 'application/json'
            }
    response = session.get(url,headers = headers)

    allprojects=[]
    myproject={
          'project': None
          }

    if response.status_code == 200:
        projects = response.json()
        for project in projects:
            project_name=project['name']
            myproject={
                'project_name': project_name,
                 }
            allprojects.append(myproject)
        return allprojects    
    else:
            print(f"Erreur: {response.status_code}")
            try:
                print(response.text)
            except Exception:
                pass



# Fonction pour lister les jobs d'un projet
def lister_jobs(projet):
    url = f'{rundeck_api_url}/project/{projet}/jobs'
    headers = {
        'Accept': 'application/json'
        
    }
    response = session.get(url,headers = headers)

    alljobs=[]
    myjob={
          'export_date': None,
          'projet': None,
          'job_id': None,
          'job_name': None,
          'last_result': None
          }

    if response.status_code == 200:
        jobs = response.json()
        for job in jobs:
            job_id=job['id']
            job_name=job['name']
            last_result=dernier_etat_execution(job['id'])
            myjob={
                'export_date': objdate.now().strftime("%d/%m/%Y"),
                'projet': projet,
                'job_id': job_id,
                'job_name': job_name,
                'last_result': last_result
            }
            alljobs.append(myjob)
        return alljobs    
    else:
            print(f"Erreur: {response.status_code}")
            try:
                print(response.text)
            except Exception:
                pass


# Fonction pour obtenir le dernier état d'exécution d'un job
def dernier_etat_execution(job_id):
    url = f'{rundeck_api_url}/job/{job_id}/executions'
    headers = {
        'Accept': 'application/json'
        
    }
    params = {
        'max': 1,  # Limite à une exécution pour obtenir la plus récente
        'offset': 0
    }
    response = session.get(url, headers=headers, params=params)
    
    allexec=[]
    myexec={
          'id': None,
          'statut': None,
          'start': None,
          'end': None
          }



    if response.status_code == 200:
        executions = response.json()
        if executions['executions']:
            derniere_execution = executions['executions'][0]
            exec_id=derniere_execution['id']
            exec_statut=derniere_execution['status']
            exec_start=objdate.fromisoformat(derniere_execution['date-started']['date'].replace('Z','')).strftime("%d/%m/%Y, %H:%M:%S")
            # Si 'date-started' n'est pas trouvé dans la derniere execution on positionne exec_end='null'
            if 'date-ended' in derniere_execution: 
               exec_end=objdate.fromisoformat(derniere_execution['date-ended']['date'].replace('Z','')).strftime("%d/%m/%Y, %H:%M:%S")
            else:
               exec_end='null'
            myexec={
                'id': exec_id,
                'statut': exec_statut,
                'start': exec_start,
                'end': exec_end
            }
            allexec.append(myexec)
            return allexec
        else:
            return("Aucune exécution trouvée pour ce job.")
    else:
        return(f"Erreur: {response.status_code}")





# All projects
allproj=lister_projects()   

# Transforme la liste allproj en dictionnaire
dicallproj=dict(enumerate(allproj))



# Pour chacun des projets, recuperation de la liste des jobs et de l'état de la derniere execution
dicallprojjobs={}
for i in sorted(dicallproj.keys()):
    dicallprojjobs[i]=lister_jobs(dicallproj[i]['project_name'])



# Affichage du dictionnaire imbriqué
print("ALL PROJETS - ALL LAST JOBS:")
print("EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION")
for i in sorted(dicallprojjobs.items()):
    for x in i[1]:
        print("{export_date};{projet};{job_id};{job_name};{last_result}".format(export_date=x['export_date'],projet=x['projet'],job_id=x['job_id'], job_name=x['job_name'], last_result=x['last_result']))
    


# Export vers rundeck_jobs.csv du dictionnaire dicallprojjobs 
with open('rundeck_jobs.csv', 'w') as f:
    with redirect_stdout(f):
    
      print("EXPORT_DATE;PROJET;JOB_ID;JOB_NAME;DERNIERE_EXECUTION")
      for i in sorted(dicallprojjobs.items()):
        for x in i[1]:
          print("{export_date};{projet};{job_id};{job_name};{last_result}".format(export_date=x['export_date'],projet=x['projet'],job_id=x['job_id'], job_name=x['job_name'], last_result=x['last_result']))
 

print("\nresultat exporté dans rundeck_jobs.csv")

[Powershell] – Les dates au format Iso 8601

J’ai rĂ©cemment eu besoin dans un script Powershell de construire une requĂŞte HTTP pour Microsoft API Graph, cela m’a permis de d’utiliser la norme Iso 8601 pour rĂ©aliser mes filtres.

Le format Iso 8601

La norme Iso 8601 spĂ©cifie la reprĂ©sentation numĂ©rique de la date et de l’heure, respectivement basĂ©es sur le calendrier grĂ©gorien et le système horaire sur 24 heures.

Voici la mise en forme de cette norme.

AAAA-MM-JJTHH:MM:SS,ss-/+FF:ff

AAAA: ReprĂ©sente l’annĂ©e sur 4 chiffres
MM: représente le mois sur 2 chiffres (de 01 à 12)
JJ: représente le jour sur 2 chiffres (de 01 à 31)
T: Indique le temps (Time)
HH: reprĂ©sente l’heure sur 2 chiffres (de 00 Ă  24)
MM: représente les minutes sur 2 chiffres (de 00 à 59)
SS: représente les secondes sur 2 chiffres (de 00 à 59)
ss: représente la fraction de secondes sur 1 ou 2 chiffres
+/-: reprĂ©sente le fuseau horaire, oĂą ‘+’ indique une avance sur UTC et ‘-‘ un retard sur UTC
FF: reprĂ©sente le nombre d’heure d’avance ou de retard sur UTC
ff: reprĂ©sente le nombre de minute d’avance ou de retard sur UTC
Z: indique qu’il s’agit de l’heure au format UTC (pour mĂ©ridien ZĂ©ro)

Comment l’utiliser avec Powershell

Donc pour formater la date et l’heure Ă  la norme Iso 8601, il suffit d’utiliser la commande suivante :

(Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ")

Comme vous pourrez le constater, dans mon cas j’utilise la lettre « Z » pour obtenir directement le format UTC.

Python – Script pour lister les proprietes des blobs Azure

Le script python suivant se connecte a un compte de stockage Azure avec une cle de stockage et liste les proprietes des blobs d’un container, avec la possibilitĂ© d’exporter le resultat en csv.

Le parametre MaxResult est important et est directement liĂ© au fait que la requete vers le container (voir fonction « list_all_blobs_in_container_by_lot ») s’effectue en generant un iterator (generator) qui recupere par lot (MaxResult) la liste des blobs.

Le parametre export_all_blobs_list est a False par defaut pour eviter de generer un fichier csv si le nombre de blobs est tres important (1 ligne/blob)

### azure_list_blobs_properties.py ###
##### Script Python pour lister les proprietes de blobs dans un container Azure Blob Storage #####

# PARAMETERES D'ENTREE DU SCRIPT :
# --storage_account_name : Nom du compte de stockage Azure (default: mystorageaccount)
# --Cledestockage : Clé de stockage Azure
# --nomcontainer : Nom du container de stockage Azure (default: mycontainer)
# --MaxResults : Nombre maximum de blobs à récupérer par lot (default: 10000)
# --export_all_blobs_list : Exporter la liste de tous les blobs dans un fichier CSV (True/False) (default: False)
# --exportfolder : Chemin du dossier pour les fichiers CSV (default: .)
# --fichierrapport : Chemin du fichier de rapport (default: myreport.txt)
# --help : Afficher l'aide du script

# USAGE :
# WARNING: Ne pas pas positionner a True le parametre export_all_blobs_list si le container cible contiens beaucoup de blobs. Le fichier csv generé sera très volumineux (1 ligne / blob).
# python.exe .\azure_list_blobs_properties.py --storage_account_name <nom_du_compte_de_stockage> --Cledestockage <clé_de_stockage> --nomcontainer <nom_du_container> --export_all_blobs_list True/False



import os
import argparse
from datetime import datetime, timezone
from azure.storage.blob import BlobServiceClient
import csv


if __name__ == '__main__':

        ###########################################
        # Arguments du script
        ###########################################
        
        parser = argparse.ArgumentParser()

        parser.add_argument(
        '--storage_account_name',
        type=str,
        default='mystorageaccount',
        help='Nom du compte de stockage Azure'
        )


        parser.add_argument(
        '--Cledestockage',
        type=str,
        help='Clé de stockage Azure'
        )

        parser.add_argument(
        '--nomcontainer',
        type=str,
        default='mycontainer',
        help='nom du container de stockage Azure'
        )

        parser.add_argument(
        '--MaxResults',
        type=int,
        default=10000,
        help='Nombre maximum de blobs à récupérer par lot (par défaut: 10000)'
        )

        parser.add_argument(
        '--export_all_blobs_list',
        choices=['True', 'False'],
        default='False',
        help='Exporter la liste de tous les blobs dans un fichier CSV'
        )

        # export folder
        parser.add_argument(
        '--exportfolder',
        type=str,
        default='.',
        help='Chemin du dossier pour les fichiers CSV'
        )

        # fichier de rapport
        parser.add_argument(
        '--fichierrapport',
        type=str,
        default='myreport.txt',
        help='Nom du fichier de rapport'
        )

        # On parse les arguments
        args = parser.parse_args()

        # AUTRES PARAMETRES
        # URL du compte de stockage Azure
        account_url = f"https://{args.storage_account_name}.blob.core.windows.net"

        # On signifie a Python que l'on utilise PAS de proxy pour les connexions vers Azure
        os.environ['NO_PROXY'] = '.blob.core.windows.net,login.microsoftonline.com,management.azure.com'

        
        

        ### FONCTION POUR ECRIRE UN MESSAGE DANS UN FICHIER DE RAPPORT
        def write_to_report_file(message, filename):
            """
            Écrit un message dans un fichier de rapport.

            :param message: Message à écrire dans le fichier.
            :param
            filename: Nom du fichier de rapport.
            """
            try:
                with open(args.fichierrapport, 'a', encoding='utf-8') as fichier:
                    fichier.write(message + '\n')
                print(f"Message ecrit dans le fichier de rapport '{args.fichierrapport}'.")
            except Exception as e:
                print(f"Erreur lors de l'ecriture dans le fichier {args.fichierrapport}: {e}")
        # Fin de la fonction write_to_report_file



         ## FONCTION POUR TESTER L'ACCES AU CONTAINER DE STOCKAGE AZURE
        def test_access_to_azure_storage_container(account_url, credential, container_name):
            """
            Teste l'accès à un container de stockage Azure Blob.
            
            :param account_url: URL du compte de stockage Azure.
            :param credential: Clé d'accès ou SAS token pour le compte de stockage.
            :param container_name: Nom du container Ă  tester.
            """
            try:
                blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
                container_client = blob_service_client.get_container_client(container_name)
                
                # Vérification de la connexion
                container_properties = container_client.get_container_properties()
                print(f"Connexion reussie au container '{container_name}'.")
                
                return True
            except Exception as e:
                print(f"Erreur lors de l'accès au container '{container_name}' : {e}")
                return False
        # Fin de la fonction test_access_to_azure_storage_container
        


        ## FONCTION POUR LISTER TOUT LES BLOBS DANS UN CONTAINER

        def list_all_blobs_in_container_by_lot(container_client, max_results):
            """
            Liste les blobs dans un conteneur Azure Blob Storage par lot avec une limite.

            :param container_name: Nom du conteneur.
            :param max_results: Nombre maximum de blobs à récupérer par lot.
            :return: Un générateur qui retourne les blobs par lot.
            """
            try:
                

                # Liste les blobs par lot
                blob_iterator = container_client.list_blobs(results_per_page=max_results).by_page()
                for blob_page in blob_iterator:
                    yield list(blob_page)  # Retourne un lot de blobs sous forme de liste

            except Exception as e:
                print(f"Erreur lors de la récupération des blobs : {e}")





        # FONCTION POUR ECRIRE UN CSV
        def write_csv(data, filename):
            """
            Écrit les données dans un fichier CSV.

            :param data: Liste de dictionnaires contenant les données à écrire.
                        Chaque dictionnaire représente une ligne, avec les clés comme en-têtes.
            :param filename: Nom du fichier CSV à créer.
            """
            try:
                # Vérifier si la liste de données n'est pas vide
                if not data:
                    print("Aucune donnee a ecrire dans le fichier CSV.")
                    return

                # Ouvrir le fichier en mode écriture
                with open(filename, mode='w', newline='', encoding='utf-8') as csvfile:
                    # Créer un objet writer
                    writer = csv.DictWriter(csvfile, fieldnames=data[0].keys())

                    # Écrire l'en-tête
                    writer.writeheader()

                    # Écrire les lignes
                    writer.writerows(data)

                print(f"Les donnees ont ete ecrites avec succes dans le fichier '{filename}'.")
            except Exception as e:
                print(f"Erreur lors de l'ecriture dans le fichier CSV : {e}")

    

        #Debut du traitement
        print(f"Debut du traitement pour le container {args.nomcontainer}.")



        # TEST DE L'ACCES AU CONTAINER DE STOCKAGE AZURE
        print("Test de l'acces au container de stockage Azure...")
        test_access_to_azure_storage_container(
            account_url=account_url,
            credential=args.Cledestockage,
            container_name=args.nomcontainer
        )
        # Fin du test de l'accès au container de stockage Azure
        # #################################################
  
        # On recupere la date d'aujourd'hui
        today = datetime.now().replace(tzinfo=timezone.utc)

        # Instanciation du client de service Blob
        container = BlobServiceClient(
            account_url=account_url,
            credential=args.Cledestockage
        ).get_container_client(args.nomcontainer)



        all_blobs_generator = list_all_blobs_in_container_by_lot(
            container_client=container,
            max_results=args.MaxResults # Nombre maximum de blobs à récupérer par lot
        )

        # On verifie que le generator n'est pas vide
        if not all_blobs_generator:
            print(f"Aucun blob trouve dans le container '{args.nomcontainer}'.")
            exit(1)  # Sortie du script si aucun blob n'est trouvé
        # Fin de la récupération des blobs dans le container      



        # Initialisation de la liste pour stocker les infos de tout les blobs (utilisé uniquement si l'option export_all_blobs_list est a True)
        all_blobs_info = []
        
         

        # On parcourt tous les blobs du container
        print("\n"*2)
        print("#"* 80)
        print(f"Analyse et traitement des blobs dans le container {args.nomcontainer}...")
        print("#"* 80)
       
        # initialisation de l'index pour afficher le numero du lot et le nombre total de lot a la fin du traitement
        lotcount = 0
        # initialisation de l'index pour incrementer le nombre de blobs
        blobcount = 0

        try:
            
                for blob_lot in all_blobs_generator:
                    lotcount += 1
                    print(f"Nombre de blobs dans le lot {lotcount}: {len(blob_lot)}")
                    for blob in blob_lot:
                            # On incremente le nombre de blobs
                            blobcount += 1
                            # On recupere le client du blob
                            blob_client = container.get_blob_client(blob.name)                        
                            # Récupération des propriétés du blob
                            blob_properties = blob_client.get_blob_properties()

                            
                            # On alimente all_blobs_info
                            all_blobs_info.append({
                                    'Container': blob.container,
                                    'Blob_Name': blob.name,
                                    'Blob_Size': blob.size,
                                    'is_deleted': blob.deleted,
                                    'remaining_retention_days': blob.remaining_retention_days,
                                    'is_current_version': blob.is_current_version,
                                    'last_modified': blob.last_modified,
                                    'expiration': blob_properties.immutability_policy.expiry_time if blob_properties.immutability_policy else None
                                })
            

        except StopIteration:
                print("Fin de l'iterateur, tous les lots de blobs ont ete analyses et traites.")
        except Exception as e:
                print(f"Erreur lors de la recuperation des blobs : {e}")
            # Fin de la récupération des blobs dans le container


        # affichage du nombre lots analyses
        print("\n")
        print("#"* 80)
        print(f"Nombre total de lots de blobs analyses: {lotcount}")
        print("#"* 80)



        # STATISTIQUES SUR LES BLOBS
        print("\n")
        print("#"* 80)
        print(f"Nombre total de blobs dans le container {args.nomcontainer}: {blobcount}")
        print("#"* 80)


        # On affiche certaines proprietes de all_blobs_info
        print("\n")
        print("#"* 80)
        print(f"Contenu de la liste de tous les blobs du container {args.nomcontainer}:")
        for blob_info in all_blobs_info:
            print(f"Blob Name: {blob_info['Blob_Name']}, Size: {blob_info['Blob_Size']} octets, Expiration: {blob_info['expiration']}, Last Modified: {blob_info['last_modified']}")
        print("#"* 80)


        # ECRIRE LE CONTENU DE all_blobs_info DANS UN CSV (Si l'option export_all_blobs_list est a True)
        if args.export_all_blobs_list == 'True':
            print("\n")
            print("#"* 80)
            print(f"Ecriture de la liste de tout les blobs du container dans le fichier CSV avec la date d'expiration...")
            csv_filepath = f"{args.exportfolder}\\{args.nomcontainer}_all_blobs_info.csv"
            write_csv(data=all_blobs_info, filename=csv_filepath)
            print("#"* 80)

        


        # On ajoute un message de fin dans le fichier de rapport avec toutes les statistiques
        print("\n")
        print("#"* 80)
        msg = f"Fin du traitement pour le container {args.nomcontainer}\nLe traitement s'est opere en {lotcount} lots\nNombre total de blobs: {blobcount}\nFichier CSV des blobs: {csv_filepath if args.export_all_blobs_list == 'True' else 'Non exporté'}"
        print(msg)

        # Ecriture dans le fichier de rapport
        write_to_report_file(
        message = f"\n### {today.isoformat()} ###\n{msg}\n#########################################",
        filename=f"{args.exportfolder}\\{args.fichierrapport}"
        )
        

        print("#"* 80)
        print("\n")
        # Fin du script
        print("#"* 40 + " FIN DU SCRIPT " + "#"*40)