1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| import glob
import urllib.request
from lxml import etree
from urllib.error import URLError, HTTPError
import base64
import csv
import sys
import os
import time
from datetime import datetime
from pathlib import Path
import RPi.GPIO as GPIO
import smbus as smbus
#import Adafruit_DHT
#from Adafruit_BMP085 import BMP085
import Adafruit_BMP085 as BMP085
import simpletest_BMP
import AdafruitDHT
#from AdafruitDHT import temperature
from AdafruitDHT import humidity
from simpletest_BMP import Pressure_Ext
from simpletest_BMP import Altitude_Ext
#sensor = BMP085.BMP085()
GPIO.setmode(GPIO.BCM)
while True :
port_gpio_anemometre=13
port_gpio_pluvio=19
sonde_exterieure = '/28-3c01f095d48b'
base_dir = '/sys/bus/w1/devices'
sonde_ext = base_dir + '/28-3c01f095d48b/w1_slave'
#Définition de la date sous la forme "MoisAnnee" en français.
current_datetime = datetime.now()
current_date = current_datetime.strftime("%B%Y")
Mois=['Jjanvier','février','mars','avril','mai','juin','juillet','aout','septembre','octobre','novembre','décembre']
mois = current_datetime.date().month
Q = Mois[mois-1]
current_date = Q + current_datetime.strftime("%Y")
#print(current_date)
Record_dir = '/home/pi/Domotique/Donnees_Meteo/' + current_date
Record_file = Record_dir + f"/{current_date}.txt"
def extraire_temperature (sonde) :
f_sonde = open(sonde, 'r')
lines_sonde = f_sonde.readlines()
while lines_sonde == []:
time.sleep(0.5)
# print('pas de donnees')
lines_sonde = f_sonde.readlines()
donnees_temperature = lines_sonde[1].split(" ")[9]
Temp = float(donnees_temperature[2:]) / 1000
return Temp
#Lecture de la temperature exterieure
Temp_exterieure = extraire_temperature(sonde_ext)
# print(Temp_exterieure)
#Detection implusion pluviometre gpio19
def my_callback_Pluviometre(port_gpio_pluvio):
current_datetime = datetime.now()
current_date = current_datetime.strftime("%d-%m-%Y")
current_time = current_datetime.strftime("%H:%M:%S")
Record_file_Pluviometre = Record_dir + f"/Pluviometre_{current_date}.txt"
data_to_write = {"Date": current_date, "Heure": current_time}
with open(Record_file_Pluviometre, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_to_write.keys())
writer.writerow(data_to_write)
GPIO.setup(port_gpio_pluvio, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(port_gpio_pluvio, GPIO.RISING, callback=my_callback_Pluviometre, bouncetime=5000)
def my_callback_Anemometre(port_gpio_anemometre):
current_datetime = datetime.now()
current_date = current_datetime.strftime("%d-%m-%Y")
current_time = current_datetime.strftime("%H:%M:%S")
Record_file_Anemometre = Record_dir + f"/Anemometre_{current_date}.txt"
data_to_write = {"Date": current_date, "Heure": current_time}
with open(Record_file_Anemometre, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_to_write.keys())
writer.writerow(data_to_write)
GPIO.setup(port_gpio_anemometre, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(port_gpio_anemometre, GPIO.FALLING, callback=my_callback_Anemometre, bouncetime=1000)
try:
root = {"Temp_Ext": Temp_exterieure,"Pression_Ext": Pressure_Ext/100,"Alt_Ext": round(Altitude_Ext,2),"Humidite" : round(humidity,2)/100}
current_datetime = datetime.now()
current_time = current_datetime.strftime("%H:%M:%S")
Record_file = Record_dir + f"/{current_date}.txt"
current_date = current_datetime.strftime("%d-%m-%Y")
data_to_write = {"Date": current_date, "Heure": current_time}
data_to_write.update(root)
file_exists = os.path.isfile(Record_file)
with open(Record_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_to_write.keys())
# Ãcrire l'en-tete si le fichier est nouveau
if not file_exists:
writer.writeheader()
# Ãcrire les donnees
writer.writerow(data_to_write)
except HTTPError as e:
print(f"Erreur HTTP : {e.code} {e.reason}")
except URLError as e:
print(f"Erreur d'URL : {e.reason}")
except etree.XMLSyntaxError as e:
print(f"Erreur de syntaxe XML : {e}")
f.close()
time.sleep(10) |
Partager