SE5 ECEAI/eceai 2023/2024/Brenier-Nguyen
- Séance 1 :
- Création et configuration de la VM sur Chassiron
- Installation de NanoEdge
- Installation de STM32cube
- Installation de l'OS pour la Raspberry dans la carte micro SD
- Prise en main de Nanoedge, de nucléo et de STM32cube
- Début de réflexion sur l'application (métrologie)
- Séance 2:
- Mise en place du serveur python
- Mise en place du client python
- Test réussi de communication entre client et serveur via http
- Programmation de la carte nucléo pour transfert de valeur via liaison série
- Suite réflexion sur l'application (reconnaissance alphabet langue des signes, mouvement)
- Choix de l'application : reconnaissance d'objets appliqué aux fournitures scolaires
- Entraînement des modèles pour les différentes applications cités précédemment
- Finalement : entraînement à la reconnaissance d'objets de fournitures scolaires
- Séance 3:
- Entraînement d'un nouveau modèle.
- Écriture d'un programme permettant de récupérer les données du capteur et de les écrire dans un fichier.
- Séance 4 :
- Essai d'utilisation de la librairie créée grâce à Nanoedge.
- Amélioration du programme python permettant de récupérer les données du capteur.
- Création d'un fichier .txt afin de stocker les données du capteur toutes les minutes.
- Destruction automatique des fichiers de données ayant été crée il y a plus de 10 minutes.
- L’émulateur de la librairie ne fonctionne pas s'il est exécuté dans la raspberry... Nous l'exécutons désormais directement dans notre VM.
- Pour cela nous avons adapter notre programme python afin de récupérer les données à partir d'un json pour traiter ces données dans la VM.
- Création d'un fichier toutes les minutes (contenant au format json la date et les données au format csv)
Code du serveur Python :
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import socket
# Stocker les requ tes POST re ues
post_requests = []
class CustomHTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.path = '/index.html'
if self.path == '/data':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(post_requests), 'utf-8'))
return
if self.path == '/index.html':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# Cr er le contenu HTML avec les donn es POST
content = '<html><body><h1>Donn es POST:</h1><ul>'
for item in post_requests:
content += f'<li>{item}</li>'
content += '</ul></body></html>'
self.wfile.write(content.encode('utf-8'))
return
try:
file_to_open = open(self.path[1:]).read()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8'))
except FileNotFoundError:
self.send_error(404, 'File Not Found: %s' % self.path)
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
# Ajouter les donn es POST la liste
post_requests.append(post_data)
# Envoyer une rponse HTTP
self.send_response(200)
self.end_headers()
self.wfile.write(b"POST request processed")
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
if __name__ == '__main__':
server_address = ('::', 8888) # ^icoute sur toutes les interfaces IPv6, p>
httpd = HTTPServerV6(server_address, CustomHTTPHandler)
print("Serveur actif sur le port", 8888)
httpd.serve_forever()
Code du client Python :
import argparse
import requests
import json
def send_json_to_server(url, json_file):
# Charger les données CSV depuis le fichier spécifié
with open(json_file, 'r') as file:
data = json.load(file)
# En-têtes spécifiant que vous envoyez du CSV
headers = {'Content-Type': 'application/json'}
# Envoie de la requête HTTP POST avec les données
response = requests.post(url, data=json.dumps(data), headers=headers)
# Vérification de la réponse
if response.status_code == 200:
print('Requête envoyée avec succès!')
else:
print(f'Erreur {response.status_code}: {response.text}')
if __name__ == "__main__":
# Configurer les arguments en ligne de commande
parser = argparse.ArgumentParser(description='Client Python pour envoyer des JSON via HTTP.')
parser.add_argument('url', type=str, help='URL du serveur')
parser.add_argument('json_file', type=str, help='Chemin vers le fichier JSON à envoyer')
# Analyser les arguments
args = parser.parse_args()
# Appeler la fonction pour envoyer le JSON au serveur
send_json_to_server(args.url, args.json_file)
Index permettant d'afficher le json de test que nous avons envoyé au serveur :
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Page d'Accueil</title>
</head>
<body>
<h1>Bienvenue sur Mon Serveur</h1>
<h2>Données POST reçues:</h2>
<ul id="data">
<!-- Les données POST seront listées ici -->
</ul>
</body>
</html>
Capture montrant l'affichage d'un fichier json (sans aucun sens) envoyé au serveur depuis le client :
Flash du capteur sur le logiciel STM32CubeIDE
Le code ci-dessous est la fonction qui est appelé pour le traitement des capteurs. Nous avions décidé de définir le délimiteur de donnée avec un espace.
Ci-dessous, on retrouve de la ligne 8 à 12 la configuration du capteur que l'on a défini sur un quadrillage de 8 x 8.
static void MX_53L5A1_SimpleRanging_Process(void)
{
uint32_t Id;
VL53L5A1_RANGING_SENSOR_ReadID(VL53L5A1_DEV_CENTER, &Id);
VL53L5A1_RANGING_SENSOR_GetCapabilities(VL53L5A1_DEV_CENTER, &Cap);
Profile.RangingProfile = RS_PROFILE_8x8_CONTINUOUS;
Profile.TimingBudget = TIMING_BUDGET; /* 5 ms < TimingBudget < 100 ms */
Profile.Frequency = RANGING_FREQUENCY; /* Ranging frequency Hz (shall be consistent with TimingBudget value) */
Profile.EnableAmbient = 0; /* Enable: 1, Disable: 0 */
Profile.EnableSignal = 0; /* Enable: 1, Disable: 0 */
/* set the profile if different from default one */
VL53L5A1_RANGING_SENSOR_ConfigProfile(VL53L5A1_DEV_CENTER, &Profile);
status = VL53L5A1_RANGING_SENSOR_Start(VL53L5A1_DEV_CENTER, RS_MODE_BLOCKING_CONTINUOUS);
if (status != BSP_ERROR_NONE)
{
printf("VL53L5A1_RANGING_SENSOR_Start failed\n");
while (1);
}
while (1)
{
/* polling mode */
status = VL53L5A1_RANGING_SENSOR_GetDistance(VL53L5A1_DEV_CENTER, &Result);
if (status == BSP_ERROR_NONE)
{
//print_result(&Result);
print_result_IE(&Result);
}
if (com_has_data())
{
handle_cmd(get_key());
}
HAL_Delay(POLLING_PERIOD);
}
}
Le code ci-dessous est la fonction qui nous permet d'afficher les données des capteurs via la liaison série.
On observe l'envoie des données avec un séparateur 'espace', puis lorsque les valeurs semblent incohérentes du point de vue du capteur, on envoie arbitrairement un -1 en sortie.
On obtient un jeu de donnée suivant le format : value espace value esapce .... value espace retour chariot (pour chaque lignes/quadrillage sur une ligne)
static void print_result_IE(RANGING_SENSOR_Result_t *Result)
{
int8_t j;
int8_t k;
int8_t l;
uint8_t zones_per_line;
zones_per_line = ((Profile.RangingProfile == RS_PROFILE_8x8_AUTONOMOUS) ||
(Profile.RangingProfile == RS_PROFILE_8x8_CONTINUOUS)) ? 8 : 4;
for (j = 0; j < Result->NumberOfZones; j += zones_per_line)
{
for (l = 0; l < RANGING_SENSOR_NB_TARGET_PER_ZONE; l++)
{
/* Print distance and status */
for (k = (zones_per_line - 1); k >= 0; k--)
{
if (Result->ZoneResult[j+k].NumberOfTargets > 0)
printf("%ld ",(long)Result->ZoneResult[j+k].Distance[l]);
else
printf("%ld ", -1);
}
}
}
printf("\n");
}
Code de récupération de donnée (sur la raspberry)
Nous avons le scénario suivant : le capteur qui envoie en continue les données de distance à la raspberry via la liaison série.
Nous avons donc écrit un script python afin de récupérer ces données, les formatter dans un fichier format json dans le but de les envoyer sur la VM qui pourra traiter ces données avec l'émulateur obtenue via l'application NanoEdgeAI Studio.
Nous allons vous expliquez le script que nous avons réalisés :
import serial
from threading import Thread, Event
from time import sleep
import os, sys, signal
import datetime
import glob
interruptAllEvent = Event()
def serial_thread(ser: serial) -> None:
fileName_old = os.path.dirname(__file__) + "/data/sensor_data_start"
while (1):
if (interruptAllEvent.is_set()):
break
fileName = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__%H_%M_00") + ".json"
if (fileName_old != fileName):
with open(fileName_old, "a") as fichier:
fichier.write("\"}")
string = str(ser.readline(), encoding='utf-8')
with open(fileName, "a") as fichier:
if (fileName_old != fileName):
fileName_old = fileName
fichier.write("{\"date\":\""+datetime.datetime.now().strftime("%d/%m/%Y-%H:%M:00")+"\",\"csv_file\":\"")
fichier.write(string)
def signal_handler(sig, frame):
interruptAllEvent.set()
return
def main() -> None:
ser = serial.Serial('/dev/ttyACM0', 460800)
thread = Thread(target=serial_thread, args=(ser,))
thread.start()
signal.signal(signal.SIGINT, signal_handler)
sleep(5)
os.remove(os.path.dirname(__file__) + "/data/sensor_data_start")
while (1):
if interruptAllEvent.is_set():
thread.join()
break
sleep(5)
minute = datetime.datetime.now().minute - 1
hours = datetime.datetime.now().hour
if (minute < 0):
minute = minute + 60
hours = hours - 1
try:
with open(os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json", "r+") as fichier:
line = fichier.read()
str(line).replace('\n','\\\n')
fichier.write(line)
except FileNotFoundError:
sleep(60)
file_to_keep = []
for i in range(0,10):
minute = datetime.datetime.now().minute - i
hours = datetime.datetime.now().hour
if (minute < 0):
minute = minute + 60
hours = hours - 1
path_string = os.path.dirname(__file__) + "/data/sensor_data_" + datetime.datetime.now().strftime("%d_%m_%Y__") + str(hours) + "_" + str(minute) + "_00.json"
file_to_keep.append(path_string)
files = glob.glob(os.path.dirname(__file__) + "/data/*")
files_to_remove = []
for file_name in files:
beRemoved = True
for f_keep in file_to_keep:
if file_name == f_keep:
beRemoved = False
break
if beRemoved:
files_to_remove.append(file_name)
for file_name in files_to_remove:
os.remove(file_name)
if __name__ == '__main__':
main()