Environmental Monitoring – No Screen
Saves a CSV file. Use Thonny to retrieve data.
import machine
import utime
import time
from machine import Pin, ADC, UART
from pimoroni import RGBLED, Button
from breakout_bme68x import BreakoutBME68X, STATUS_HEATER_STABLE
from pimoroni_i2c import PimoroniI2C
from breakout_ltr559 import BreakoutLTR559
# from pms5003 import PMS5003
"""
This basic example shows how to read from all the sensors on Enviro+.
Prints results to the REPL and saves data as a CSV
"""
# Set time
rtc=machine.RTC()
# Setup buttons
button_a = Button(12, invert=True)
button_b = Button(13, invert=True)
button_x = Button(14, invert=True)
button_y = Button(15, invert=True)
# change this to adjust temperature compensation
TEMPERATURE_OFFSET = 3
# set up the LED
led = RGBLED(6, 7, 10, invert=True)
led.set_rgb(0, 0, 255)
# set up the Pico's I2C
PINS_BREAKOUT_GARDEN = {"sda": 4, "scl": 5}
i2c = PimoroniI2C(**PINS_BREAKOUT_GARDEN)
# set up BME688 and LTR559 sensors
bme = BreakoutBME68X(i2c, address=0x77)
ltr = BreakoutLTR559(i2c)
# setup analog channel for microphone
MIC_PIN = 26
mic = ADC(Pin(26))
# Button a state
a = False
while True:
if button_a.is_pressed:
a = True
led.set_rgb(0, 255, 0)
print("Button A pressed!")
# Open file
timestamp=rtc.datetime()
fname ="%04d%02d%02d-%02d%02d%02d.csv"%(timestamp[0:3] + timestamp[4:7])
file=open(fname,"w")
file.write("Time,Temperature (°C),Humidity (%), Pressure (hPa), Gas, Lux, Mic")
# Run program
while not button_b.is_pressed:
# read BME688
temperature, pressure, humidity, gas, status, _, _ = bme.read()
heater = "Stable" if status & STATUS_HEATER_STABLE else "Unstable"
# correct temperature and humidity using an offset
corrected_temperature = temperature - TEMPERATURE_OFFSET
dewpoint = temperature - ((100 - humidity) / 5)
corrected_humidity = 100 - (5 * (corrected_temperature - dewpoint))
# read LTR559
ltr_reading = ltr.get_reading()
lux = ltr_reading[BreakoutLTR559.LUX]
prox = ltr_reading[BreakoutLTR559.PROXIMITY]
# read mic
mic_reading = mic.read_u16()
if heater == "Stable" and ltr_reading is not None:
timestamp=rtc.datetime()
print(timestamp)
timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
led.set_rgb(0, 255, 0)
print(f"""\n
Time = {timestring}
Temperature = {corrected_temperature} °C
Humidity = {corrected_humidity} %
Pressure = {pressure/100} hPa
Gas = {gas}
Lux = {lux}
Mic = {mic_reading}
""")
csvline = f"""\n{timestring},{corrected_temperature},{corrected_humidity},{pressure/100},{gas},{lux},{mic_reading}"""
file.write(csvline)
else:
# light up the LED red if there's a problem with the BME688 or LTR559 sensor readings
led.set_rgb(255, 0, 0)
time.sleep(1.0)
# Button B has been pressed
print("Button B pressed")
led.set_rgb(0, 0, 255)
# Close file
if a:
file.close()
a = False
Code language: Python (python)
With Screen, Network and Time Server
- Picographics library
- ntptime
- Requires
network_manager.py
andWIFI_CONFIG.py
from the micropython/examples/common directory.
Save the above on the pico.
ntptime.py
import time
try:
import usocket as socket
except:
import socket
try:
import ustruct as struct
except:
import struct
# The NTP host can be configured at runtime by doing: ntptime.host = 'myhost.org'
host = "pool.ntp.org"
# The NTP socket timeout can be configured at runtime by doing: ntptime.timeout = 2
timeout = 1
def ttime():
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1B
addr = socket.getaddrinfo(host, 123)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(timeout)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
finally:
s.close()
val = struct.unpack("!I", msg[40:44])[0]
EPOCH_YEAR = time.gmtime(0)[0]
if EPOCH_YEAR == 2000:
# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 3155673600
elif EPOCH_YEAR == 1970:
# (date(1970, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 2208988800
else:
raise Exception("Unsupported epoch: {}".format(EPOCH_YEAR))
return val - NTP_DELTA
# There's currently no timezone support in MicroPython, and the RTC is set in UTC time.
# Add 11 for AEDT
def settime():
t = ttime()
import machine
tm = time.gmtime(t)
machine.RTC().datetime((tm[0], tm[1], tm[2], tm[6] + 1, tm[3], tm[4], tm[5], 0))
Code language: Python (python)
network_manager.py
import rp2
import network
import machine
import uasyncio
class NetworkManager:
_ifname = ("Client", "Access Point")
def __init__(self, country="GB", client_timeout=60, access_point_timeout=5, status_handler=None, error_handler=None):
rp2.country(country)
self._ap_if = network.WLAN(network.AP_IF)
self._sta_if = network.WLAN(network.STA_IF)
self._mode = network.STA_IF
self._client_timeout = client_timeout
self._access_point_timeout = access_point_timeout
self._status_handler = status_handler
self._error_handler = error_handler
self.UID = ("{:02X}" * 8).format(*machine.unique_id())
def isconnected(self):
return self._sta_if.isconnected() or self._ap_if.isconnected()
def config(self, var):
if self._sta_if.active():
return self._sta_if.config(var)
else:
if var == "password":
return self.UID
return self._ap_if.config(var)
def mode(self):
if self._sta_if.isconnected():
return self._ifname[0]
if self._ap_if.isconnected():
return self._ifname[1]
return None
def ifaddress(self):
if self._sta_if.isconnected():
return self._sta_if.ifconfig()[0]
if self._ap_if.isconnected():
return self._ap_if.ifconfig()[0]
return '0.0.0.0'
def disconnect(self):
if self._sta_if.isconnected():
self._sta_if.disconnect()
if self._ap_if.isconnected():
self._ap_if.disconnect()
async def wait(self, mode):
while not self.isconnected():
self._handle_status(mode, None)
await uasyncio.sleep_ms(1000)
def _handle_status(self, mode, status):
if callable(self._status_handler):
self._status_handler(self._ifname[mode], status, self.ifaddress())
def _handle_error(self, mode, msg):
if callable(self._error_handler):
if self._error_handler(self._ifname[mode], msg):
return
raise RuntimeError(msg)
async def client(self, ssid, psk):
if self._sta_if.isconnected():
self._handle_status(network.STA_IF, True)
return
self._ap_if.disconnect()
self._ap_if.active(False)
self._sta_if.active(True)
self._sta_if.config(pm=0xa11140)
self._sta_if.connect(ssid, psk)
try:
await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout)
self._handle_status(network.STA_IF, True)
except uasyncio.TimeoutError:
self._sta_if.active(False)
self._handle_status(network.STA_IF, False)
self._handle_error(network.STA_IF, "WIFI Client Failed")
async def access_point(self):
if self._ap_if.isconnected():
self._handle_status(network.AP_IF, True)
return
self._sta_if.disconnect()
self._sta_if.active(False)
self._ap_if.ifconfig(("10.10.1.1", "255.255.255.0", "10.10.1.1", "10.10.1.1"))
self._ap_if.config(password=self.UID)
self._ap_if.active(True)
try:
await uasyncio.wait_for(self.wait(network.AP_IF), self._access_point_timeout)
self._handle_status(network.AP_IF, True)
except uasyncio.TimeoutError:
self._sta_if.active(False)
self._handle_status(network.AP_IF, False)
self._handle_error(network.AP_IF, "WIFI Client Failed")
Code language: Python (python)
WIFI_CONFIG.py
Replace the values below with the appropriate values.
SSID = "WIFI SSID"
PSK = "WIFI PASSWORD "
COUNTRY = "COUNTRY CODE"
Code language: Python (python)
main.py
17/3/2024: Updated to only write average data (microphone maximum) once a minute rather than every second.
import machine
import time
import ntptime
import WIFI_CONFIG
import random
from machine import Pin, ADC, UART
from picographics import PicoGraphics, DISPLAY_ENVIRO_PLUS
from pimoroni import RGBLED, Button
from breakout_bme68x import BreakoutBME68X, STATUS_HEATER_STABLE
from pimoroni_i2c import PimoroniI2C
from breakout_ltr559 import BreakoutLTR559
from network_manager import NetworkManager
import uasyncio
# from pms5003 import PMS5003
"""
This basic example shows how to read from all the sensors on Enviro+.
Prints results to the REPL and saves data as a CSV
"""
# set up wifi
def status_handler(mode, status, ip):
display.set_font("bitmap8")
display.set_pen(BLACK)
display.clear()
display.set_pen(WHITE)
display.text("Network: {}".format(WIFI_CONFIG.SSID), 10, 10, scale=2)
print("Network: {}".format(WIFI_CONFIG.SSID))
status_text = "Connecting..."
if status is not None:
if status:
status_text = "Connection successful!"
else:
status_text = "Connection failed!"
display.text(status_text, 10, 40, scale=2)
print(status_text)
print("IP: {}".format(ip))
display.text("IP: {}".format(ip), 10, 60, scale=2)
display.update()
display.set_font("sans")
# Time on display
def display_time():
colours = [RED, GREEN, CYAN, MAGENTA, YELLOW, BLUE, WHITE]
timestamp=rtc.datetime()
timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
textheight = random.randint(25, 180)
textwidth = random.randint(0, 30)
colour_choice = random.choice(colours)
display.set_font("bitmap8")
display.set_pen(BLACK)
display.clear()
display.set_pen(colour_choice)
display.text("Press A to start", textwidth, textheight, WIDTH, scale=2)
display.text(f"{timestring}", textwidth, textheight + 20, WIDTH, scale=2)
display.update()
time.sleep(1)
# Set time
rtc=machine.RTC()
# Setup buttons
button_a = Button(12, invert=True)
button_b = Button(13, invert=True)
button_x = Button(14, invert=True)
button_y = Button(15, invert=True)
# Setup screen
# set up the display
display = PicoGraphics(display=DISPLAY_ENVIRO_PLUS)
BRIGHTNESS = 0.8
display.set_backlight(BRIGHTNESS)
# some constants we'll use for drawing
WHITE = display.create_pen(255, 255, 255)
BLACK = display.create_pen(0, 0, 0)
RED = display.create_pen(255, 0, 0)
GREEN = display.create_pen(0, 255, 0)
CYAN = display.create_pen(0, 255, 255)
MAGENTA = display.create_pen(200, 0, 200)
YELLOW = display.create_pen(200, 200, 0)
BLUE = display.create_pen(0, 0, 200)
FFT_COLOUR = display.create_pen(255, 0, 255)
GREY = display.create_pen(75, 75, 75)
WIDTH, HEIGHT = display.get_bounds()
display.set_font("sans")
# change this to adjust temperature compensation - doesn't seem to require this.
TEMPERATURE_OFFSET = 5
# set up the LED
led = RGBLED(6, 7, 10, invert=True)
led.set_rgb(0, 0, 255)
# set up the Pico's I2C
PINS_BREAKOUT_GARDEN = {"sda": 4, "scl": 5}
i2c = PimoroniI2C(**PINS_BREAKOUT_GARDEN)
# set up BME688 and LTR559 sensors
bme = BreakoutBME68X(i2c, address=0x77)
ltr = BreakoutLTR559(i2c)
# setup analog channel for microphone
MIC_PIN = 26
mic = ADC(Pin(26))
network_manager = NetworkManager(WIFI_CONFIG.COUNTRY, status_handler=status_handler)
# connect to wifi
uasyncio.get_event_loop().run_until_complete(network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK))
# Set time
ntptime.settime()
# Disconnect network
network_manager.disconnect()
# Button a state
a = False
# Second counter
secs = 0
while True:
display_time()
if button_a.is_pressed:
a = True
led.set_rgb(0, 255, 0)
display.set_backlight(BRIGHTNESS)
display.set_font("sans")
display.set_pen(RED)
display.text("waiting for sensors", 0, 20, WIDTH, scale=1)
display.update()
# Setup measures
temperature_sum = 0
humidity_sum = 0
lux_sum = 0
pressure_sum = 0
gas_sum = 0
mic_max = 0
# Open file
timestamp=rtc.datetime()
fname ="%04d%02d%02d-%02d%02d%02d.csv"%(timestamp[0:3] + timestamp[4:7])
file=open(fname,"w")
file.write("Time,Temperature (°C),Humidity (%), Pressure (hPa), Gas, Lux, Mic")
# Run program
while not button_b.is_pressed:
if button_y.is_pressed:
# Switch backlight off
display.set_backlight(0)
elif button_x.is_pressed:
display.set_backlight(BRIGHTNESS)
# read BME688
temperature, pressure, humidity, gas, status, _, _ = bme.read()
heater = "Stable" if status & STATUS_HEATER_STABLE else "Unstable"
# correct temperature and humidity using an offset
corrected_temperature = temperature - TEMPERATURE_OFFSET
dewpoint = temperature - ((100 - humidity) / 5)
corrected_humidity = 100 - (5 * (corrected_temperature - dewpoint))
temperature_sum = temperature_sum + corrected_temperature
pressure_sum = pressure_sum + pressure
humidity_sum = humidity_sum + corrected_humidity
gas_sum = gas_sum + gas
# read LTR559
ltr_reading = ltr.get_reading()
lux = ltr_reading[BreakoutLTR559.LUX]
prox = ltr_reading[BreakoutLTR559.PROXIMITY]
lux_sum = lux_sum + lux
# read mic
mic_reading = mic.read_u16()
if mic_reading > mic_max:
mic_max = mic_reading
if heater == "Stable" and ltr_reading is not None:
timestamp=rtc.datetime()
print(timestamp)
timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] + timestamp[4:7])
led.set_rgb(0, 255, 0)
print(f"""\n
Time = {timestring}
Temperature = {corrected_temperature} °C
Humidity = {corrected_humidity} %
Pressure = {pressure/100} hPa
Gas = {gas}
Lux = {lux}
Mic = {mic_reading}
""")
if secs > 59:
avg_temp = temperature_sum / secs
avg_humidity = humidity_sum / secs
avg_pressure = pressure_sum / secs
avg_lux = lux_sum / secs
avg_gas = gas_sum / secs
# Write to csv file
csvline = f"""\n{timestring},{avg_temp},{avg_humidity},{avg_pressure/100},{avg_gas},{avg_lux},{mic_max}"""
file.write(csvline)
secs = 0
mic_max = 0
temperature_sum = 0
humidity_sum = 0
lux_sum = 0
pressure_sum = 0
gas_sum = 0
mic_max = 0
# Display on screen
display.set_pen(BLACK)
display.clear()
display.set_pen(WHITE)
display.text(f"Temp: {corrected_temperature:.0f} °C", 0, 25, WIDTH, scale=1)
display.text(f"Humd: {corrected_humidity:.0f}%", 0, 55, WIDTH, scale=1)
display.text(f"Prs: {pressure/100:.0f}hPa", 0, 85, WIDTH, scale=1)
display.text(f"Gas: {gas}", 0, 115, WIDTH, scale=1)
display.text(f"Lux: {lux:.0f} lux", 0, 145, WIDTH, scale=1)
display.text(f"Mic: {mic_reading - 32000}", 0, 175, WIDTH, scale=1)
display.update()
else:
# light up the LED red if there's a problem with the BME688 or LTR559 sensor readings
led.set_rgb(255, 0, 0)
display.set_pen(BLACK)
display.clear()
display.set_backlight(BRIGHTNESS)
# Increment seconds
secs = secs + 1
# Measure every second.
time.sleep(1.0)
# Button B has been pressed
print("Button B pressed")
led.set_rgb(0, 0, 255)
display_time()
# Close file
if a:
file.close()
a = False
secs = 0
Code language: Python (python)
Usage
Press A to start collecting data and B to stop. X and Y switch on and off the screen backlight.
Retrieve the data files off the Pico using Thonny and delete off the Pico to save storage space.