Différences entre versions de « Mydin-advanced-usage »

De MCHobby - Wiki
Sauter à la navigation Sauter à la recherche
 
Ligne 184 : Ligne 184 :
 
'''Now, glue the snipset into the final code:'''
 
'''Now, glue the snipset into the final code:'''
  
The content here below can be found in
+
The content here below can be found in the [https://github.com/mchobby/micropython-mydin/blob/main/examples/din_controlers/common/async-scheduler.py async-scheduler.py] example:
 +
 
 +
<syntaxhighlight lang="python">
 +
from mydin import configure
 +
from mydin.pico import Pico3Mod
 +
from mydin.backplane.relays import TwoRelay3Mod
 +
import time, sys, asyncio
 +
 
 +
import schedule
 +
 
 +
din = configure( Pico3Mod, TwoRelay3Mod )
 +
 
 +
# === Define & schedule Jobs ==================
 +
async def job_relay2():
 +
global din
 +
din.rel2.on()
 +
din.leds[1].on()
 +
await asyncio.sleep(30) # Wait 30 Sec
 +
din.leds[1].off()
 +
 
 +
 
 +
async def job_toggle_led( led_idx ): # Job with parameter
 +
global din
 +
din.leds[led_idx].toggle()
 +
 
 +
schedule.every(1).minutes.do( job_relay2 )
 +
schedule.every(3).seconds.do( job_toggle_led, 2 ) # Toggle led 2 of 0..3
 +
schedule.every(7).seconds.do( job_toggle_led, 3 ) # Toggle led 3 of 0..3
 +
# --- Other examples of scheduling ---
 +
# schedule.every(10).seconds.do(job_with_argument, name="MicroPython")
 +
# for h in range( 8, 17 ):
 +
# schedule.every().day.at("%02i:00" % h).do( upd_auth_file_job, drawer )
 +
# schedule.every().day.at("20:00").do( upl_log_file_job, drawer )
 +
 
 +
# === Main DIN tasks ===================
 +
counter = 0
 +
async def loop( din ):
 +
""" called again and again (like Arduino) """
 +
global counter
 +
counter += 1
 +
# Place your loop code here
 +
print( "loop iteration %3i " % (counter ) )
 +
await asyncio.sleep( 5 )
 +
 
 +
 
 +
# === Running Din Project ==============
 +
def create_tasks( async_evloop ):
 +
scheduler_task = async_evloop.create_task( schedule.run_forever() )
 +
 
 +
din.setup( setup=None, loop=loop, on_tasks_create=create_tasks )
 +
din.run()
 +
</syntaxhighlight>
 +
 
 +
As the tasks and jobs displays {{fname|print()}} statements, we can view the activity in the REPL session:
 +
 
 +
<nowiki>mcu localtime: 1/1/2000 2:22:27
 +
Pico3Mod.run() entering...
 +
USER Loop Setup...
 +
USER Tasks Setup...
 +
running event_loop...
 +
loop iteration  1
 +
DEBUG: Running job <Job object at 20023b40>
 +
DEBUG: Jobs to await: 1
 +
loop iteration  2
 +
DEBUG: Running job <Job object at 20023b40>
 +
DEBUG: Jobs to await: 1
 +
DEBUG: Running job <Job object at 20026530>
 +
DEBUG: Jobs to await: 1
 +
DEBUG: Running job <Job object at 20023b40>
 +
DEBUG: Jobs to await: 1
 +
loop iteration  3
 +
DEBUG: Running job <Job object at 20023b40>
 +
DEBUG: Jobs to await: 1
 +
DEBUG: Running job <Job object at 20026530>
 +
DEBUG: Jobs to await: 1
 +
loop iteration  4
 +
[EXIT] run app exit!
 +
event_loop ends!</nowiki>

Version actuelle datée du 29 avril 2025 à 21:27

Abstract

Before going into the details of the various controlers and backplanes, it may be opportune to learn some advanced behaviors of the MyDin asyncio implementation.

RUN_APP

RUN_APP pin is used to stop the MyDin software stack (or script execution after a reboot).

It is a convenient way to recover the control on the module when script goes wrong. It would also offer the full access to REPL.

When setting the RUN_APP pin to the "Stop" position (so tied to ground) while the MyDin software is already running, the code may takes up to 3 seconds to terminates.

It is also a good practice to check the RUN_APP pin in the boot.py startup file. This may allows debugging operations when Network operation goes wrong!

from machine import Pin
# Use the RUN_APP pin as defined on your controler board!
RUN_APP = Pin.board.GP3  # High=Run, Low=Stop

if Pin( RUN_APP, Pin.IN, Pin.PULL_UP ).value()==False:
  print( '[BOOT] skip execution (RUN_APP is false)' )
else:
  print( '[BOOT] entering...' )
  # >>> PLACE BOOT CODE HERE <<<
  print( '[BOOT] exit' )

Exception handling

An exception occurring in the userloop task will be captured and will ends the code execution.

Exception occurring in other tasks will be only be reported into REPL.

When the code execution ends due to an exception then the loop_exception contains a reference to the error.

When the code execution ends for other reason (eg: RUN_APP=Stop) then the loop_exception value is None.

The async-exception.py test script -visible here below- do raise an exception when the user press the second button (index=1).

from mydin import configure
from mydin.pico import Pico3Mod
from mydin.backplane.relays import TwoRelay3Mod 
import time, sys

class KaboomError( Exception ):
	pass

# Which Controler + Backplane to use
din = configure( Pico3Mod, TwoRelay3Mod )

async def loop( din ):
  for i in range( 4 ): # 0..3 = 4 buttons
    if din.was_pressed( i ):
      din.leds[i].toggle()
      # raise en exception is button 2 is pressed
      if i==1: 
        raise KaboomError( "This exception will exits the code")


din.setup( setup=None, loop=loop )
din.run()
print( "="*40 )
print( "din.run() did exit!!!")

if din.loop_exception != None:
  print( "An error %s occured!" % (din.loop_exception.__class__.__name__ ) )
  print( "with message: %s" % din.loop_exception )

Using setup()

The setup parameter in the DinControler.setup( self, setup=None, loop=None, on_tasks_create=None ) is used to define and initialise additional hardware/sensor/bus connected to myDin.

The parameter must be the reference to a regular function accepting a "din" object as parameter.

Good practice: instead of defining global variable, it is recommended to add a new attribute on the "din" object. By doing so, the userloop can easily access the new object.

The following async-qwiic-bme280.py example show how to create such reference and how to retreive it in the userloop. The sensor is a BME280 sensor wired on the I2C bus.

from mydin import configure
from mydin.pico import Pico3Mod
from mydin.backplane.relays import TwoRelay3Mod 
import time, sys

from bme280 import *
din = configure( Pico3Mod, TwoRelay3Mod )

def setup( din ):
  # Add a new attribute "sensor" to "din" instance
  din.sensor = BME280( i2c=din.i2c, address=BMP280_I2CADDR )

async def loop( din ):
  # temperature (celcius), atmospheric_pressure (hPascal), Relative_humidity (percent)
  temp, hpa, rh = din.sensor.raw_values
  print( "temperature",temp, "pressure", hpa, "humidity", rh )

din.setup( setup=setup, loop=loop )
din.run()

Remarks:

  • Notice the def setup(): function definition and its reference in din.setup( setup=setup, loop=loop ) call.
  • The instance of BME280 is stored under the din.sensor attribute.
  • The def setup(): function IS NOT AN ASYNC function.
  • The async def loop( din ) userloop can retreive the BME280 instance from the din.sensor attribute.

Activating watchdog

Using a watchdog is a way to protect your solution bugs like infinite loop or unexpected end of execution.

Without notification of the user code signaling its current --and continuing-- code execution, the watchdog will restart the entire system (aka Reset the MCU).

Please note that once started, a watchdog cannot be disabled!

Most of microcontroler do have an hardware watchdog. Once the watchdog started (set its timeout), you will have to ping it (feed it) before it reach the timeout... otherwise it will reset the microcontroler.

The example async-watchdog.py, visible here below, shows how to activate the watchdog. No need to call the din.feed_watchdog() because it is already called at every execution cycle of the userloop .

it is possible to lock the userloop by pressing and holding down the button 1. Keep it down long enough and the watchdog will reset the microcontroler!
from mydin import configure
from mydin.pico import Pico3Mod
from mydin.backplane.relays import TwoRelay3Mod 

din = configure( Pico3Mod, TwoRelay3Mod )

async def loop( din ):
  din.leds[0].off()
  # direct access to button state
  while din.buttons[0].value()==0: # while pressed
    din.leds[0].on()


din.setup( setup=None, loop=loop )
din.setup_watchdog( 2000 ) # Watchdog resets if loop() stay locked for 2 sec 
din.run()

Add scheduler task

The on_tasks_create parameter of the call din.setup( setup=None, loop=loop, on_tasks_create=create_tasks ) allow the user_code to add one or more asynchronous tasks.

One of the most interesting task to create is a scheduler!

A scheduler like micropython-aioschedule can do much more than executing a job every minute or so. A scheduler can execute a task every day at a given time, or every Tuesday, every Saturday, a given date, etc.

The following code snippet shows the schedule of two job... jobs defined as async functions because aioschedule is asynchronous implementation.

import schedule

# === Define & schedule Jobs ==================
async def job_relay2():
	global din
	din.rel2.on()
	din.leds[1].on()
	await asyncio.sleep(30) # Wait 30 Sec
	din.leds[1].off()
	din.rel2.off()


async def job_toggle_led( led_idx ): # Job with parameter
	global din
	din.leds[led_idx].toggle()

schedule.every(1).minutes.do( job_relay2 )
schedule.every(3).seconds.do( job_toggle_led, 2 ) # Toggle led 2 of 0..3
schedule.every(7).seconds.do( job_toggle_led, 3 ) # Toggle led 3 of 0..3

Now, let's use the on_tasks_create parameter to create a new asyncio task that runs the scheduler.

def create_tasks( async_evloop ):
	# new task to register
	scheduler_task = async_evloop.create_task( schedule.run_forever() )

din.setup( setup=None, loop=loop, on_tasks_create=create_tasks )

Now, glue the snipset into the final code:

The content here below can be found in the async-scheduler.py example:

from mydin import configure
from mydin.pico import Pico3Mod
from mydin.backplane.relays import TwoRelay3Mod 
import time, sys, asyncio

import schedule

din = configure( Pico3Mod, TwoRelay3Mod )

# === Define & schedule Jobs ==================
async def job_relay2():
	global din
	din.rel2.on()
	din.leds[1].on()
	await asyncio.sleep(30) # Wait 30 Sec
	din.leds[1].off()


async def job_toggle_led( led_idx ): # Job with parameter
	global din
	din.leds[led_idx].toggle()

schedule.every(1).minutes.do( job_relay2 )
schedule.every(3).seconds.do( job_toggle_led, 2 ) # Toggle led 2 of 0..3
schedule.every(7).seconds.do( job_toggle_led, 3 ) # Toggle led 3 of 0..3
# --- Other examples of scheduling ---
# schedule.every(10).seconds.do(job_with_argument, name="MicroPython")
# for h in range( 8, 17 ):
# 	schedule.every().day.at("%02i:00" % h).do( upd_auth_file_job, drawer )
# schedule.every().day.at("20:00").do( upl_log_file_job, drawer )

# === Main DIN tasks ===================
counter = 0
async def loop( din ):
	""" called again and again (like Arduino) """
	global counter
	counter += 1
	# Place your loop code here
	print( "loop iteration %3i " % (counter ) )
	await asyncio.sleep( 5 )


# === Running Din Project ==============
def create_tasks( async_evloop ):
	scheduler_task = async_evloop.create_task( schedule.run_forever() )

din.setup( setup=None, loop=loop, on_tasks_create=create_tasks )
din.run()

As the tasks and jobs displays print() statements, we can view the activity in the REPL session:

mcu localtime: 1/1/2000 2:22:27
Pico3Mod.run() entering...
USER Loop Setup...
USER Tasks Setup...
running event_loop...
loop iteration   1
DEBUG: Running job <Job object at 20023b40>
DEBUG: Jobs to await: 1
loop iteration   2
DEBUG: Running job <Job object at 20023b40>
DEBUG: Jobs to await: 1
DEBUG: Running job <Job object at 20026530>
DEBUG: Jobs to await: 1
DEBUG: Running job <Job object at 20023b40>
DEBUG: Jobs to await: 1
loop iteration   3
DEBUG: Running job <Job object at 20023b40>
DEBUG: Jobs to await: 1
DEBUG: Running job <Job object at 20026530>
DEBUG: Jobs to await: 1
loop iteration   4
[EXIT] run app exit!
event_loop ends!