SE5 ECEAI/eceai 2023/2024/Brenier-Nguyen

De wiki-se.plil.fr
Aller à la navigation Aller à la recherche
  • Séance 1 :
    • Création et configuration de la VM sur Chassiron
    • Installation de NanoEdge
    • Installation de STM32cube
    • Installation de l'OS pour la Raspberry dans la carte micro SD
    • Prise en main de Nanoedge, de nucléo et de STM32cube
    • Début de réflexion sur l'application (métrologie)
  • Séance 2:
    • Mise en place du serveur python
    • Mise en place du client python
    • Test réussi de communication entre client et serveur via http
    • Programmation de la carte nucléo pour transfert de valeur via liaison série
    • Suite réflexion sur l'application (reconnaissance alphabet langue des signes, mouvement)
    • Choix de l'application : reconnaissance d'objets appliqué aux fournitures scolaires
    • Entraînement des modèles pour les différentes applications cités précédemment
    • Finalement : entraînement à la reconnaissance d'objets de fournitures scolaires
  • Séance 3:
    • Entraînement d'un nouveau modèle.
    • Écriture d'un programme permettant de récupérer les données du capteur et de les écrire dans un fichier.
  • Séance 4 :
    • Essai d'utilisation de la librairie créée grâce à Nanoedge.
    • Amélioration du programme python permettant de récupérer les données du capteur.
      • Création d'un fichier .txt afin de stocker les données du capteur toutes les minutes.
      • Destruction automatique des fichiers de données ayant été crée il y a plus de 10 minutes.
    • L’émulateur de la librairie ne fonctionne pas s'il est exécuté dans la raspberry... Nous l'exécutons désormais directement dans notre VM.
    • Pour cela nous avons adapter notre programme python afin de récupérer les données à partir d'un json pour traiter ces données dans la VM.
      • Création d'un fichier toutes les minutes (contenant au format json la date et les données au format csv)


Code du serveur Python :

from http.server import HTTPServer, BaseHTTPRequestHandler

import json

import socket

# Stocker les requ  tes POST re  ues

post_requests = []

class CustomHTTPHandler(BaseHTTPRequestHandler):

   def do_GET(self):

       if self.path == '/':

           self.path = '/index.html'

       if self.path == '/data':

           self.send_response(200)

           self.send_header('Content-type', 'application/json')

           self.end_headers()

           self.wfile.write(bytes(json.dumps(post_requests), 'utf-8'))

           return

if self.path == '/index.html':

           self.send_response(200)

           self.send_header('Content-type', 'text/html')

           self.end_headers()

           # Cr  er le contenu HTML avec les donn  es POST

           content = '<html><body><h1>Donn  es POST:</h1><ul>'

           for item in post_requests:

               content += f'<li>{item}</li>'

           content += '</ul></body></html>'

           self.wfile.write(content.encode('utf-8'))

           return

       try:

file_to_open = open(self.path[1:]).read()

           self.send_response(200)

           self.send_header('Content-type', 'text/html')

           self.end_headers()

           self.wfile.write(bytes(file_to_open, 'utf-8'))

       except FileNotFoundError:

           self.send_error(404, 'File Not Found: %s' % self.path)

   def do_POST(self):

       content_length = int(self.headers['Content-Length'])

       post_data = self.rfile.read(content_length).decode('utf-8')

       # Ajouter les donn  es POST    la liste

       post_requests.append(post_data)

       # Envoyer une rponse HTTP

       self.send_response(200)

       self.end_headers()

self.wfile.write(b"POST request processed")

class HTTPServerV6(HTTPServer):

   address_family = socket.AF_INET6

if __name__ == '__main__':

   server_address = ('::', 8888)  #  ^icoute sur toutes les interfaces IPv6, p>

   httpd = HTTPServerV6(server_address, CustomHTTPHandler)

   print("Serveur actif sur le port", 8888)

   httpd.serve_forever()

Code du client Python :

import argparse                                                                                                                            
import requests                                                                                                                            
import json                                                                                                                                
                                                                                                                                           
def send_json_to_server(url, json_file):                                                                                                   
    # Charger les données CSV depuis le fichier spécifié                                                                                   
    with open(json_file, 'r') as file:                                                                                                     
        data = json.load(file)                                                                                                             
                                                                                                                                           
    # En-têtes spécifiant que vous envoyez du CSV                                                                                          
    headers = {'Content-Type': 'application/json'}                                                                                         
                                                                                                                                           
    # Envoie de la requête HTTP POST avec les données                                                                                      
    response = requests.post(url, data=json.dumps(data), headers=headers)                                                                  
                                                                                                                                           
    # Vérification de la réponse                                                                                                           
    if response.status_code == 200:                                                                                                        
        print('Requête envoyée avec succès!')                                                                                              
    else:                                                                                                                                  
        print(f'Erreur {response.status_code}: {response.text}')                                                                           
                                                                                                                                           
if __name__ == "__main__":                                                                                                                 
    # Configurer les arguments en ligne de commande                                                                                        
    parser = argparse.ArgumentParser(description='Client Python pour envoyer des JSON via HTTP.')                                          
    parser.add_argument('url', type=str, help='URL du serveur')                                                                            
    parser.add_argument('json_file', type=str, help='Chemin vers le fichier JSON à envoyer')                                               
                                                                                                                                           
    # Analyser les arguments                                                                                                               
    args = parser.parse_args()                                                                                                             
                                                                                                                                           
    # Appeler la fonction pour envoyer le JSON au serveur                                                                                  
    send_json_to_server(args.url, args.json_file)

Index permettant d'afficher le json de test que nous avons envoyé au serveur :

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Page d'Accueil</title>
</head>
<body>
    <h1>Bienvenue sur Mon Serveur</h1>
    <h2>Données POST reçues:</h2>
    <ul id="data">
        <!-- Les données POST seront listées ici -->
    </ul>
</body>
</html>

Capture montrant l'affichage d'un fichier json (sans aucun sens) envoyé au serveur depuis le client :


Flash du capteur sur le logiciel STM32CubeIDE

Le code ci-dessous est la fonction qui est appelé pour le traitement des capteurs. Nous avions décidé de définir le délimiteur de donnée avec un espace.

Démonstration du protocole HTTP

Ci-dessous, on retrouve de la ligne 8 à 12 la configuration du capteur que l'on a défini sur un quadrillage de 8 x 8.

static void MX_53L5A1_SimpleRanging_Process(void)
{
  uint32_t Id;

  VL53L5A1_RANGING_SENSOR_ReadID(VL53L5A1_DEV_CENTER, &Id);
  VL53L5A1_RANGING_SENSOR_GetCapabilities(VL53L5A1_DEV_CENTER, &Cap);

  Profile.RangingProfile = RS_PROFILE_8x8_CONTINUOUS;
  Profile.TimingBudget = TIMING_BUDGET; /* 5 ms < TimingBudget < 100 ms */
  Profile.Frequency = RANGING_FREQUENCY; /* Ranging frequency Hz (shall be consistent with TimingBudget value) */
  Profile.EnableAmbient = 0; /* Enable: 1, Disable: 0 */
  Profile.EnableSignal = 0; /* Enable: 1, Disable: 0 */

  /* set the profile if different from default one */
  VL53L5A1_RANGING_SENSOR_ConfigProfile(VL53L5A1_DEV_CENTER, &Profile);

  status = VL53L5A1_RANGING_SENSOR_Start(VL53L5A1_DEV_CENTER, RS_MODE_BLOCKING_CONTINUOUS);

  if (status != BSP_ERROR_NONE)
  {
    printf("VL53L5A1_RANGING_SENSOR_Start failed\n");
    while (1);
  }

  while (1)
  {
    /* polling mode */
    status = VL53L5A1_RANGING_SENSOR_GetDistance(VL53L5A1_DEV_CENTER, &Result);

    if (status == BSP_ERROR_NONE)
    {
      //print_result(&Result);
      print_result_IE(&Result);
    }

    if (com_has_data())
    {
      handle_cmd(get_key());
    }

    HAL_Delay(POLLING_PERIOD);
  }
}

Le code ci-dessous est la fonction qui nous permet d'afficher les données des capteurs via la liaison série.

On observe l'envoie des données avec un séparateur 'espace', puis lorsque les valeurs semblent incohérentes du point de vue du capteur, on envoie arbitrairement un -1 en sortie.

On obtient un jeu de donnée suivant le format : value espace value esapce .... value espace retour chariot (pour chaque lignes/quadrillage sur une ligne)

static void print_result_IE(RANGING_SENSOR_Result_t *Result)
{
  int8_t j;
  int8_t k;
  int8_t l;
  uint8_t zones_per_line;

  zones_per_line = ((Profile.RangingProfile == RS_PROFILE_8x8_AUTONOMOUS) ||
		 (Profile.RangingProfile == RS_PROFILE_8x8_CONTINUOUS)) ? 8 : 4;

  for (j = 0; j < Result->NumberOfZones; j += zones_per_line)
  {
	for (l = 0; l < RANGING_SENSOR_NB_TARGET_PER_ZONE; l++)
	{
	  /* Print distance and status */
	  for (k = (zones_per_line - 1); k >= 0; k--)
	  {
		if (Result->ZoneResult[j+k].NumberOfTargets > 0)
		  printf("%ld ",(long)Result->ZoneResult[j+k].Distance[l]);
		else
		  printf("%ld ", -1);
	  }
	}
  }
  printf("\n");
}

Code de récupération de donnée (sur la raspberry)

Nous avons le scénario suivant : le capteur qui envoie en continue les données de distance à la raspberry via la liaison série.

Nous avons donc écrit un script python afin de récupérer ces données, les formater dans un fichier format json dans le but de les envoyer sur la VM qui pourra traiter ces données avec l'émulateur obtenue via l'application NanoEdgeAI Studio.

Nous allons vous expliquez le script que nous avons réalisés :

import serial
from threading import Thread, Event
from time import sleep
import os, sys, signal
import datetime
import glob

interruptAllEvent = Event()

def serial_thread(ser: serial) -> None:
    fileName_old = os.path.dirname(__file__) + "/data/sensor_data_start"
    while (1):
        if (interruptAllEvent.is_set()):
            break
        fileName = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__%H_%M_00") + ".json"

        if (fileName_old != fileName):
            with open(fileName_old, "a") as fichier:
                fichier.write("\"}")

        string = str(ser.readline(), encoding='utf-8')

        with open(fileName, "a") as fichier:
            if (fileName_old != fileName):
                fileName_old = fileName
                fichier.write("{\"date\":\""+datetime.datetime.now().strftime("%d/%m/%Y-%H:%M:00")+"\",\"csv_file\":\"")

            fichier.write(string)        
        
def signal_handler(sig, frame):
    interruptAllEvent.set()
    return

def main() -> None:
    ser = serial.Serial('/dev/ttyACM0', 460800)
    thread = Thread(target=serial_thread, args=(ser,))

    thread.start()

    signal.signal(signal.SIGINT, signal_handler)   

    sleep(5)
    os.remove(os.path.dirname(__file__) + "/data/sensor_data_start")

    while (1):
        if interruptAllEvent.is_set():
            thread.join()
            break

        sleep(5)

        minute = datetime.datetime.now().minute - 1
        hours = datetime.datetime.now().hour
        if (minute < 0):
            minute = minute + 60
            hours = hours - 1
        try:
            with open(os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json", "r+") as fichier:
                line = fichier.read()
                str(line).replace('\n','\\\n')
                fichier.write(line)
        except FileNotFoundError:
            sleep(60)

        file_to_keep = []
        for i in range(0,10):
            minute = datetime.datetime.now().minute - i
            hours = datetime.datetime.now().hour
            if (minute < 0):
                minute = minute + 60
                hours = hours - 1
            path_string = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json"
            file_to_keep.append(path_string)

        files = glob.glob(os.path.dirname(__file__) + "/data/*")

        files_to_remove = []

        for file_name in files:
            beRemoved = True
            for f_keep in file_to_keep:
                if file_name == f_keep:
                    beRemoved = False
                    break
            if beRemoved:
                files_to_remove.append(file_name)

        for file_name in files_to_remove:
            os.remove(file_name)

        

if __name__ == '__main__':
        main()

Résultats renvoyés par le programme de reconnaissance