SE5 ECEAI/eceai 2023/2024/Brenier-Nguyen

De wiki-se.plil.fr
Aller à la navigation Aller à la recherche
  • Séance 1 :
    • Création et configuration de la VM sur Chassiron
    • Installation de NanoEdge
    • Installation de STM32cube
    • Installation de l'OS pour la Raspberry dans la carte micro SD
    • Prise en main de Nanoedge, de nucléo et de STM32cube
    • Début de réflexion sur l'application (métrologie)
  • Séance 2:
    • Mise en place du serveur python
    • Mise en place du client python
    • Test réussi de communication entre client et serveur via http
    • Programmation de la carte nucléo pour transfert de valeur via liaison série
    • Suite réflexion sur l'application (reconnaissance alphabet langue des signes, mouvement)
    • Choix de l'application : reconnaissance d'objets appliqué aux fournitures scolaires
    • Entraînement des modèles pour les différentes applications cités précédemment
    • Finalement : entraînement à la reconnaissance d'objets de fournitures scolaires
  • Séance 3:
    • Entraînement d'un nouveau modèle.
    • Écriture d'un programme permettant de récupérer les données du capteur et de les écrire dans un fichier.
  • Séance 4 :
    • Essai d'utilisation de la librairie créée grâce à Nanoedge.
    • Amélioration du programme python permettant de récupérer les données du capteur.
      • Création d'un fichier .txt afin de stocker les données du capteur toutes les minutes.
      • Destruction automatique des fichiers de données ayant été crée il y a plus de 10 minutes.
    • L’émulateur de la librairie ne fonctionne pas s'il est exécuté dans la raspberry... Nous l'exécutons désormais directement dans notre VM.
    • Pour cela nous avons adapter notre programme python afin de récupérer les données à partir d'un json pour traiter ces données dans la VM.
      • Création d'un fichier toutes les minutes (contenant au format json la date et les données au format csv)


Code du serveur Python :

from http.server import HTTPServer, BaseHTTPRequestHandler

import json

import socket

# Stocker les requ  tes POST re  ues

post_requests = []

class CustomHTTPHandler(BaseHTTPRequestHandler):

   def do_GET(self):

       if self.path == '/':

           self.path = '/index.html'

       if self.path == '/data':

           self.send_response(200)

           self.send_header('Content-type', 'application/json')

           self.end_headers()

           self.wfile.write(bytes(json.dumps(post_requests), 'utf-8'))

           return

if self.path == '/index.html':

           self.send_response(200)

           self.send_header('Content-type', 'text/html')

           self.end_headers()

           # Cr  er le contenu HTML avec les donn  es POST

           content = '<html><body><h1>Donn  es POST:</h1><ul>'

           for item in post_requests:

               content += f'<li>{item}</li>'

           content += '</ul></body></html>'

           self.wfile.write(content.encode('utf-8'))

           return

       try:

file_to_open = open(self.path[1:]).read()

           self.send_response(200)

           self.send_header('Content-type', 'text/html')

           self.end_headers()

           self.wfile.write(bytes(file_to_open, 'utf-8'))

       except FileNotFoundError:

           self.send_error(404, 'File Not Found: %s' % self.path)

   def do_POST(self):

       content_length = int(self.headers['Content-Length'])

       post_data = self.rfile.read(content_length).decode('utf-8')

       # Ajouter les donn  es POST    la liste

       post_requests.append(post_data)

       # Envoyer une rponse HTTP

       self.send_response(200)

       self.end_headers()

self.wfile.write(b"POST request processed")

class HTTPServerV6(HTTPServer):

   address_family = socket.AF_INET6

if __name__ == '__main__':

   server_address = ('::', 8888)  #  ^icoute sur toutes les interfaces IPv6, p>

   httpd = HTTPServerV6(server_address, CustomHTTPHandler)

   print("Serveur actif sur le port", 8888)

   httpd.serve_forever()

Code du client Python :

import argparse                                                                                                                            
import requests                                                                                                                            
import json                                                                                                                                
                                                                                                                                           
def send_json_to_server(url, json_file):                                                                                                   
    # Charger les données CSV depuis le fichier spécifié                                                                                   
    with open(json_file, 'r') as file:                                                                                                     
        data = json.load(file)                                                                                                             
                                                                                                                                           
    # En-têtes spécifiant que vous envoyez du CSV                                                                                          
    headers = {'Content-Type': 'application/json'}                                                                                         
                                                                                                                                           
    # Envoie de la requête HTTP POST avec les données                                                                                      
    response = requests.post(url, data=json.dumps(data), headers=headers)                                                                  
                                                                                                                                           
    # Vérification de la réponse                                                                                                           
    if response.status_code == 200:                                                                                                        
        print('Requête envoyée avec succès!')                                                                                              
    else:                                                                                                                                  
        print(f'Erreur {response.status_code}: {response.text}')                                                                           
                                                                                                                                           
if __name__ == "__main__":                                                                                                                 
    # Configurer les arguments en ligne de commande                                                                                        
    parser = argparse.ArgumentParser(description='Client Python pour envoyer des JSON via HTTP.')                                          
    parser.add_argument('url', type=str, help='URL du serveur')                                                                            
    parser.add_argument('json_file', type=str, help='Chemin vers le fichier JSON à envoyer')                                               
                                                                                                                                           
    # Analyser les arguments                                                                                                               
    args = parser.parse_args()                                                                                                             
                                                                                                                                           
    # Appeler la fonction pour envoyer le JSON au serveur                                                                                  
    send_json_to_server(args.url, args.json_file)

Index permettant d'afficher le json de test que nous avons envoyé au serveur :

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Page d'Accueil</title>
</head>
<body>
    <h1>Bienvenue sur Mon Serveur</h1>
    <h2>Données POST reçues:</h2>
    <ul id="data">
        <!-- Les données POST seront listées ici -->
    </ul>
</body>
</html>

Capture montrant l'affichage d'un fichier json (sans aucun sens) envoyé au serveur depuis le client :


Flash du capteur sur le logiciel STM32CubeIDE

Le code ci-dessous est la fonction qui est appelé pour le traitement des capteurs. Nous avions décidé de définir le délimiteur de donnée avec un espace.

Démonstration du protocole HTTP

Ci-dessous, on retrouve de la ligne 8 à 12 la configuration du capteur que l'on a défini sur un quadrillage de 8 x 8.

static void MX_53L5A1_SimpleRanging_Process(void)
{
  uint32_t Id;

  VL53L5A1_RANGING_SENSOR_ReadID(VL53L5A1_DEV_CENTER, &Id);
  VL53L5A1_RANGING_SENSOR_GetCapabilities(VL53L5A1_DEV_CENTER, &Cap);

  Profile.RangingProfile = RS_PROFILE_8x8_CONTINUOUS;
  Profile.TimingBudget = TIMING_BUDGET; /* 5 ms < TimingBudget < 100 ms */
  Profile.Frequency = RANGING_FREQUENCY; /* Ranging frequency Hz (shall be consistent with TimingBudget value) */
  Profile.EnableAmbient = 0; /* Enable: 1, Disable: 0 */
  Profile.EnableSignal = 0; /* Enable: 1, Disable: 0 */

  /* set the profile if different from default one */
  VL53L5A1_RANGING_SENSOR_ConfigProfile(VL53L5A1_DEV_CENTER, &Profile);

  status = VL53L5A1_RANGING_SENSOR_Start(VL53L5A1_DEV_CENTER, RS_MODE_BLOCKING_CONTINUOUS);

  if (status != BSP_ERROR_NONE)
  {
    printf("VL53L5A1_RANGING_SENSOR_Start failed\n");
    while (1);
  }

  while (1)
  {
    /* polling mode */
    status = VL53L5A1_RANGING_SENSOR_GetDistance(VL53L5A1_DEV_CENTER, &Result);

    if (status == BSP_ERROR_NONE)
    {
      //print_result(&Result);
      print_result_IE(&Result);
    }

    if (com_has_data())
    {
      handle_cmd(get_key());
    }

    HAL_Delay(POLLING_PERIOD);
  }
}

Le code ci-dessous est la fonction qui nous permet d'afficher les données des capteurs via la liaison série.

On observe l'envoie des données avec un séparateur 'espace', puis lorsque les valeurs semblent incohérentes du point de vue du capteur, on envoie arbitrairement un -1 en sortie.

On obtient un jeu de donnée suivant le format : value espace value esapce .... value espace retour chariot (pour chaque lignes/quadrillage sur une ligne)

static void print_result_IE(RANGING_SENSOR_Result_t *Result)
{
  int8_t j;
  int8_t k;
  int8_t l;
  uint8_t zones_per_line;

  zones_per_line = ((Profile.RangingProfile == RS_PROFILE_8x8_AUTONOMOUS) ||
		 (Profile.RangingProfile == RS_PROFILE_8x8_CONTINUOUS)) ? 8 : 4;

  for (j = 0; j < Result->NumberOfZones; j += zones_per_line)
  {
	for (l = 0; l < RANGING_SENSOR_NB_TARGET_PER_ZONE; l++)
	{
	  /* Print distance and status */
	  for (k = (zones_per_line - 1); k >= 0; k--)
	  {
		if (Result->ZoneResult[j+k].NumberOfTargets > 0)
		  printf("%ld ",(long)Result->ZoneResult[j+k].Distance[l]);
		else
		  printf("%ld ", -1);
	  }
	}
  }
  printf("\n");
}

Code de récupération de donnée (sur la raspberry)

Nous avons le scénario suivant : le capteur qui envoie en continue les données de distance à la raspberry via la liaison série.

Nous avons donc écrit un script python afin de récupérer ces données, les formater dans un fichier format json dans le but de les envoyer sur la VM qui pourra traiter ces données avec l'émulateur obtenue via l'application NanoEdgeAI Studio.

Nous allons vous expliquez le script que nous avons réalisés :

import serial
from threading import Thread, Event
from time import sleep
import os, sys, signal
import datetime
import glob

# Événement pour signaler l'arrêt des threads
interruptAllEvent = Event()

# Fonction de thread pour lire les données et les sauvegarder dans des fichiers
def serial_thread(ser: serial) -> None:
    # Chemin du fichier pour les données initiales du capteur
    # Ces données sont vouées à ne pas être utilisées
    fileName_old = os.path.dirname(__file__) + "/data/sensor_data_start"

    while (1):
        if (interruptAllEvent.is_set()):
            break

        # Créer un nouveau fichier pour chaque minute avec l'horodatage actuel
        fileName = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__%H_%M_00") + ".json"

        # Fermer le fichier de la minute précédente en ajoutant la parenthèse fermante
        if (fileName_old != fileName):
            with open(fileName_old, "a") as fichier:
                fichier.write("\"}")

        # Lire les données du port série
        string = str(ser.readline(), encoding='utf-8')

        # Ouvrir le fichier de la minute actuelle et écrire les données
        with open(fileName, "a") as fichier:
            # S'il s'agit d'une nouvelle minute, commencer un nouvel objet JSON
            if (fileName_old != fileName):
                fileName_old = fileName
                fichier.write("{\"date\":\""+datetime.datetime.now().strftime("%d/%m/%Y-%H:%M:00")+"\",\"csv_file\":\"")

            # Écrire les données dans le fichier
            fichier.write(string)

# Gestionnaire de signal pour définir l'événement d'interruption lors d'une interruption clavier (Ctrl+C)
def signal_handler(sig, frame):
    interruptAllEvent.set()
    return

# Fonction principale
def main() -> None:
    # Créer un objet pour la communication série
    ser = serial.Serial('/dev/ttyACM0', 460800)

    # Créer un thread pour la communication série
    thread = Thread(target=serial_thread, args=(ser,))

    # Démarrer le thread de communication série
    thread.start()

    # Mettre en place le gestionnaire de signal pour les événements d'interruption (Ctrl+C)
    signal.signal(signal.SIGINT, signal_handler)

    # Attendre pendant 5 secondes et supprimer le fichier de données initial du capteur
    sleep(5)
    os.remove(os.path.dirname(__file__) + "/data/sensor_data_start")

    while (1):
        # Vérifier si l'événement d'interruption est défini
        if interruptAllEvent.is_set():
            # Attendre que le thread série se termine, puis sortir de la boucle
            thread.join()
            break

        sleep(5)

        # Ajuster les valeurs de minute et d'heure pour l'heure précédente
        minute = datetime.datetime.now().minute - 1
        hours = datetime.datetime.now().hour
        if (minute < 0):
            minute = minute + 60
            hours = hours - 1

        # On modifie tout les \n du fichier json afin que son transfert et sa réception vers le serveur
        # soit du bon format. On protège donc tout les \n du fichier avant son envoie.
        try:
            # Lire et modifier le contenu du fichier de la minute précédente
            with open(os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json", "r+") as fichier:
                line = fichier.read()
                str(line).replace('\n','\\\n')
                fichier.write(line)
        except FileNotFoundError:
            sleep(60)

        # Créer une liste de chemins de fichiers pour les 10 dernières minutes
        # Tout les autres sont amenés à être supprimés. Dans le but de préserver de la mémoire de stockage
        file_to_keep = []
        for i in range(0,10):
            minute = datetime.datetime.now().minute - i
            hours = datetime.datetime.now().hour
            if (minute < 0):
                minute = minute + 60
                hours = hours - 1
            path_string = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json"
            file_to_keep.append(path_string)

        # Obtenir une liste de tous les fichiers dans le répertoire data
        files = glob.glob(os.path.dirname(__file__) + "/data/*")

        # Créer une liste de fichiers à supprimer
        files_to_remove = []

        # Vérifier chaque fichier et l'ajouter à la liste de suppression s'il n'est pas dans la liste des fichiers à conserver
        for file_name in files:
            beRemoved = True
            for f_keep in file_to_keep:
                if file_name == f_keep:
                    beRemoved = False
                    break
            if beRemoved:
                files_to_remove.append(file_name)

        # Supprimer les fichiers de la liste de suppression
        for file_name in files_to_remove:
            os.remove(file_name)

# Point d'entrée du script
if __name__ == '__main__':
    main()

Résultats renvoyés par le programme de reconnaissance

Une colle à reconnaitre
Aucun objet à reconnaitre