Saturday, March 13, 2021

RPi Pico + ESP32-S remote control ESP32-DevKitC via WiFi TCP, using MicroPython.

This exercise program Raspberry Pi Pico/MicroPython + ESP32-S (ESP-AT) as WiFi TCP client to remote control ESP32-DevKitC V4/MicroPython WiFi TCP server onboard LED.

In Client side:

ESP32-S is a wireless module based on ESP32. It's flashed with AT-command firmware ESP-AT. It's act as a WiFi co-processor. Raspberry Pi Pico/MicroPython send AT-command to ESP32-S via UART. Please note that for ESP32 flashed with ESP-AT: UART1 (IO16/IO17) is used to send AT commands and receive AT responses, connected to GP0/GP1 of Pico.

Pico GP15 connected to ESP32-S EN pin, to reset it in power up.

Pico GP16 is used control remote LED.

In Server side:

ESP32-DevKitC V4 (with ESP32-WROVER-E module)/MicroPython is programed as WiFi server, receive command from client to turn ON/OFF LED accordingly.

Connection:


MicroPython code:

Client side, mpyPico_ESP32S_remoteCli_.py run on Raspberry Pi Pico.
import uos
import machine
import utime
"""
Raspberry Pi Pico/MicroPython + ESP32-S exercise

ESP32-S with AT-command firmware (ESP-AT):
---------------------------------------------------
AT version:2.1.0.0(883f7f2 - Jul 24 2020 11:50:07)
SDK version:v4.0.1-193-ge7ac221
compile time(0ad6331):Jul 28 2020 02:47:21
Bin version:2.1.0WROM-3)
---------------------------------------------------

Pico send AT command to ESP32-S via UART,
Send command to server (ESP32/MicroPython)
to turn ON/OFF LED on server.
---------------------------------------------------
Connection:
powered by separated power supply
Pico                  ESP32-S
GND                   GND
GP0 (TX) (pin 1)      IO16 (RXD)
GP1 (RX) (pin 2)      IO17 (TXD)
GP15 (pin 20)         EN

GP16 (pin 21) button
---------------------------------------------------
"""
#server port & ip hard-coded,
#have to match with server side setting
server_ip="192.168.4.1"
server_port=8000

network_ssid = "ESP32-ssid"
network_password = "password"

ESP_EN = 15
PIN_ESP_EN = machine.Pin(ESP_EN, machine.Pin.IN, machine.Pin.PULL_UP)
DIn = machine.Pin(16, machine.Pin.IN, machine.Pin.PULL_UP)


print()
print("Machine: \t" + uos.uname()[4])
print("MicroPython: \t" + uos.uname()[3])
#indicate program started visually
led_onboard = machine.Pin(25, machine.Pin.OUT)
led_onboard.value(0)     # Toggle onboard LED 
utime.sleep(0.5)         # to indiacte program start
led_onboard.value(1)
utime.sleep(1)
led_onboard.value(0)

#Reset ESP
PIN_ESP_EN = machine.Pin(ESP_EN, machine.Pin.OUT)
PIN_ESP_EN.value(1)
utime.sleep(0.5)
PIN_ESP_EN.value(0)
utime.sleep(0.5)
PIN_ESP_EN.value(1)
PIN_ESP_EN = machine.Pin(ESP_EN, machine.Pin.IN, machine.Pin.PULL_UP)

uart = machine.UART(0, baudrate=115200)
print(uart)

RES_OK = b'OK\r\n'
len_OK = len(RES_OK)

RESULT_OK      = '0'
RESULT_TIMEOUT = '1'
def sendCMD_waitResult(cmd, timeout=2000):
    print("CMD: " + cmd)
    uart.write(cmd)

    prvMills = utime.ticks_ms()
    result = RESULT_TIMEOUT
    resp = b""
    
    while (utime.ticks_ms()-prvMills)<timeout:
        if uart.any():
            resp = b"".join([resp, uart.read(1)])
            resp_len = len(resp)
            if resp[resp_len-len_OK:]==RES_OK:
                print(RES_OK + " found!")
                result = RESULT_OK
                break
    
    print("resp:")
    try:
        print(resp.decode())
    except UnicodeError:
        print(resp)

    return result

#to make it simple to detect, RMCMD & RMSTA designed same length
#Remote Command from client to serve
RMCMD_len = 6
RMCMD_ON  = "LEDONN"   #turn LED ON
RMCMD_OFF = "LEDOFF"   #turn LED OFF
#Remote status from server to client
RMSTA_len = 6
RMSTA_timeout = "timeout" #time out without/unknown reply
RMSTA_LEDON  = "LedOnn"
RMSTA_LEDOFF = "LedOff"
"""
#Expected flow to send command to wifi is:
Pico (client) to ESP-01S    response from ESP-01S to Pico
AT+CIPSEND=<cmd len>\r\n
                            AT+CIPSEND=<cmd len>\r\n
                            OK\r\n
                            >\r\n
<cmd>
                            Recv x bytes\r\n
                            SEND OK\r\n           ---> ESP (server)
                                                  <--- ESP (server) end with OK\r\n
                            +IPD,10:<RMSTA>OK\r\n
                            +IPD,2:\r\n
"""
def sendRemoteCmd(rmcmd, timeout=2000):
    result = RMSTA_timeout
    
    if sendCMD_waitResult('AT+CIPSEND=' + str(len(rmcmd)) + '\r\n', timeout)==RESULT_OK:
        
        #dummy read '>'
        while not uart.any():
            pass
        print(uart.read(1))
        
        print("Remote CMD: " + rmcmd)
        if sendCMD_waitResult(rmcmd) == RESULT_OK:
            
            endMills = utime.ticks_ms() + timeout
            resp = b""
            
            while utime.ticks_ms()<endMills:
                if uart.any():
                    resp = b"".join([resp, uart.read(1)])
                    resp_len = len(resp)
                    if resp[resp_len-len_OK:]==RES_OK:
                        print(RES_OK + " found!")
                        rmSta=resp[resp_len-len_OK-RMSTA_len:resp_len-len_OK]
                        strRmSta=rmSta.decode()   #convert bytes to string
                        print(strRmSta)
                        if strRmSta == RMSTA_LEDON:
                            result = strRmSta
                        elif strRmSta == RMSTA_LEDOFF:
                            result = strRmSta
                        break
            print("resp:")
            try:
                print(resp.decode())
            except UnicodeError:
                print(resp)
    
    return result

def sendCMD_waitResp(cmd, timeout=2000):
    print("CMD: " + cmd)
    uart.write(cmd)
    waitResp(timeout)
    print()
    
def waitResp(timeout=2000):
    prvMills = utime.ticks_ms()
    resp = b""
    while (utime.ticks_ms()-prvMills)<timeout:
        if uart.any():
            resp = b"".join([resp, uart.read(1)])
    print("resp:")
    try:
        print(resp.decode())
    except UnicodeError:
        print(resp)
        
"""
everytimes send command to server:
- join ESP32 network
- connect to ESP32 socket
- send command to server and receive status
"""
def connectRemoteSendCmd(cmdsend):
    clearRxBuf()
    print("join wifi network: " + "ESP32-ssid")
    while sendCMD_waitResult('AT+CWJAP="' + network_ssid + '","'
                       + network_password + '"\r\n') != RESULT_OK:
        pass
    
    sendCMD_waitResp('AT+CIFSR\r\n')    #Obtain the Local IP Address
    sendCMD_waitResult('AT+CIPSTATUS\r\n')
    
    print("wifi network joint")
    print("connect socket")

    if sendCMD_waitResult('AT+CIPSTART="TCP","'
                      + server_ip
                      + '",'
                      + str(server_port) + '\r\n', timeout=5000) == RESULT_OK:
        sendCMD_waitResult('AT+CIPSTATUS\r\n')
        print("RMST: " + sendRemoteCmd(rmcmd=cmdsend))

    clearRxBuf()
    sendCMD_waitResult('AT+CIPSTATUS\r\n')
    sendCMD_waitResult('AT+CWQAP\r\n')

            
def clearRxBuf():
    print("--- clear Rx buffer ---")
    buf = b""
    while uart.any():
        buf = b"".join([buf, uart.read(1)])
    print(buf)
    print("-----------------------")
    
led_onboard.value(0)
clearRxBuf()
sendCMD_waitResult('AT\r\n')          #Test AT startup
sendCMD_waitResult('AT+CWMODE=1\r\n') #Set the Wi-Fi mode 1 = Station mode
sendCMD_waitResult('AT+CIPMUX=0\r\n') #single connection.

led_onboard.value(1)
connectRemoteSendCmd(cmdsend=RMCMD_ON)
utime.sleep(1)
connectRemoteSendCmd(cmdsend=RMCMD_OFF)

#fast toggle led 5 times to indicate startup finished
for i in range(5):
    led_onboard.value(0)
    utime.sleep(0.2)
    led_onboard.value(1)
    utime.sleep(0.2)
    led_onboard.value(0)

print("Started")
print("waiting for button")
#read digital input every 10ms
dinMills = utime.ticks_ms() + 30
prvDin = 1
debounced = False
while True:
    if utime.ticks_ms() > dinMills:
        dinMills = utime.ticks_ms() + 30
        curDin = DIn.value()
        if curDin != prvDin:
            #Din changed
            prvDin = curDin
            debounced = False
        else:
            if not debounced:
                #DIn changed for > 30ms
                debounced = True
                if curDin:
                    connectRemoteSendCmd(cmdsend=RMCMD_OFF)
                else:
                    connectRemoteSendCmd(cmdsend=RMCMD_ON)
                    
                
    

Server side, upyESP32_AP_RemoteSvr_.py run on ESP32-DevKitC V4.
import utime
import uos
import network
import usocket
from machine import Pin

"""
ESP32/MicroPython exercise:
ESP32 act as Access Point,
and setup a simple TCP server

receive command from client and turn ON/OFF LED,
and send back status.
"""

ssid= "ESP32-ssid"
password="password"

led=Pin(13,Pin.OUT)

print("----- MicroPython -----")
for u in uos.uname():
    print(u)
print("-----------------------")

for i in range(3):
    led.on()
    utime.sleep(0.5)
    led.off()
    utime.sleep(0.5)
    

ap = network.WLAN(network.AP_IF) # Access Point
ap.config(essid=ssid,
          password=password,
          authmode=network.AUTH_WPA_WPA2_PSK) 
ap.config(max_clients=1)  # max number of client
ap.active(True)           # activate the access point

print(ap.ifconfig())
print(dir(ap))

mysocket = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
mysocket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)

port = 8000
mysocket.bind(('',8000))
print("bind: " + str(port))
mysocket.listen(1)

#tomake it simple to detect, RMCMD & RMSTA designed same length
#Remote Command from client to serve
RMCMD_len = 6
RMCMD_ON  = "LEDONN"    #turn LED ON
RMCMD_OFF = "LEDOFF"   #turn LED OFF
#Remote status from server to client
RMSTA_len = 6
RMSTA_timeout = "timeout" #time out without/unknown reply
RMSTA_LEDON  = "LedOnn"
RMSTA_LEDOFF = "LedOff"

while True:
  conn, addr = mysocket.accept()
  print('Connected from: %s' % str(addr))
  print()
  request = conn.recv(1024)
  print('request: %s' % str(request))
  print()
  
  strRqs = request.decode()
  print("strRqs: "+strRqs)
  if strRqs==RMCMD_ON:
      conn.send(RMSTA_LEDON+'OK\r\n')
      led.on()
  elif strRqs==RMCMD_OFF:
      conn.send(RMSTA_LEDOFF+'OK\r\n')
      led.off()
  else:
      #unknown command
      conn.send(request.upper())
  conn.send('\r\n')
  conn.close()


No comments: